Skip to main content

How to build a Conduit Connector

· 8 min read

In this article, we are going to walk through, step by step, how to build a Conduit connector.

Conduit connectors communicate with Conduit by either writing records into the pipeline (source connector) and/or the other way around (destination connector).

For this example, we are going to build an Algolia destination connector. The goal of this connector is to give the user the ability to send data to Algolia. In the context of search engines, this is called indexing. Since Conduit is a generic tool to move data between data infrastructure, with this new connector we can index data from any Conduit Source (PostgreSQL, Kafka, etc.).

You may find this full example on GitHub.

Let's build!

How it works

We are implmenting a Conduit connector that will be able to index data into Algolia. This will allow users to move data from any Conduit Source to Algolia. This connector will be a destination connector.

In order to implement a Conduit destination connector. We need to write four methods:

  1. Configure - Initialize and validate parameters. We'll need our Algolia application ID and API key, which we can get from Algolia.
  2. Open - We'll open our connection. We'll use the Algolia client to create an index. An index is like a database table in Algolia.
  3. Write - We'll take our records and write them to our Algolia index.
  4. Teardown - We'll close our connection and clean up any resources we've used.

Connectors can be created in many programming languages because Conduit Connectors leverage gRPC. For this example, we are going to use the Go Connector SDK.

Also, for this example, the datastore we are building the connector for is Algolia. Algolia is a fully managed search engine. As a destination, this connector allows for real-time syncing between any Conduit Source and Algolia.

To communicate with Algolia, we can use the Algolia Go library.

Connector File Structure

Next, let's take a look at an example connector file structure:

  • destination.go - The main destination code and where you interact with Algolia.
  • object.go - Helper functions to serialize records into JSON for Algolia.
  • spec.go - The specification defines connector metadata and parameters.
  • cmd/algolia/main.go - The entrypoint for the connector.

Setup

Let's bootstrap our new connector. We are going to name our connector conduit-connector-algolia. It's best practice to use the conduit-connector-NAME format so the community can find connectors.

  1. Create a new folder for your connector:
mkdir -p /where/you/want/your/connector
  1. Init Go Modules:
go mod init
  1. Create the main.go entrypoint.
mkdir cmd/algolia && touch cmd/algolia/main.go
  1. Create more connector files:
touch destination.go spec.go object.co

These are the files that we will be working with to build our connector.

  1. Install Go dependencies
go get github.com/conduitio/conduit-connector-sdk
go get github.com/algolia/algoliasearch-client-go/v3

These dependencies are required for the connector to run. Here is where you can learn more:

Specification

First, let's build spec.go. The Specification defines the metadata for your connector.

package algolia

import sdk "github.com/conduitio/conduit-connector-sdk"

func Specification() sdk.Specification {
return sdk.Specification{
Name: "algolia",
Summary: "A destination connector for Algolia",
Description: "This connector allows for real-time syncing between any Conduit Source and Algolia.",
Version: "v0.1.0",
Author: "Meroxa, Inc.",
DestinationParams: map[string]sdk.Parameter{
DestinationConfigAPIKey: {
Required: true,
Description: "The API key for Algolia.",
},
DestinationConfigApplicationID: {
Required: true,
Description: "The Application ID for Algolia.",
},
DestinationConfigIndexName: {
Required: true,
Description: "The Algolia index where records get written into.",
},
},
SourceParams: nil,
}
}

Here's what the above file is doing:

  1. It imports the conduit-connector-sdk package.
  2. It defines a function called Specification that returns a sdk.Specification struct.

This function defines the metadata for the connector. It defines the name, summary, description, version, author and the parameters that the user can configure for the connector.

Destination

The destination.go implements the connector lifecycle methods and is where you interact with Algolia.

...

type Destination struct {
sdk.UnimplementedDestination

config DestinationConfig
index *search.Index
}

type DestinationConfig struct {
APIKey string
ApplicationID string
IndexName string
}
...

First, we define two structs:

  • Destination - implements the sdk.Destination interface. This is the "storage" struct of your connector.
  • DestinationConfig - contains the configuration for the destination.

Configure

Configure is always the first function to be called in a connector and is used to validate and store the connector configuration.

func (d *Destination) Configure(ctx context.Context, cfg map[string]string) error {
destCfg := DestinationConfig{
APIKey: cfg[DestinationConfigAPIKey],
ApplicationID: cfg[DestinationConfigApplicationID],
IndexName: cfg[DestinationConfigIndexName],
}

if destCfg.APIKey == "" {
return fmt.Errorf("%q is a required parameter", DestinationConfigAPIKey)
}
if destCfg.ApplicationID == "" {
return fmt.Errorf("%q is a required parameter", DestinationConfigApplicationID)
}
if destCfg.IndexName == "" {
return fmt.Errorf("%q is a required parameter", DestinationConfigIndexName)
}

d.config = destCfg
return nil
}

In the above example, we are validating the configuration and storing it in the Destination struct.

Open

The Open method is called after Configure. It is used to open the connection to the destination.

func (d *Destination) Open(ctx context.Context) error {
client := search.NewClient(d.config.ApplicationID, d.config.APIKey)
index := client.InitIndex(d.config.IndexName)
d.index = index
return nil
}

In the above code, we are initializing the Algolia client and index.

Write

The Write method is called when a record is received from the Conduit Source. It is used to write the record to the destination.

func (d *Destination) Write(ctx context.Context, record sdk.Record) error {
, err := d.index.SaveObject(Object(record))
if err != nil {
return fmt.Errorf("could not save object: %w", err)
}

sdk.Logger(ctx).Debug().
Int("taskId", res.TaskID).
Str("objectId", res.ObjectID).
Msg("saved object")

return nil
}

Here, we are saving the record to Algolia.

Note: The Object type serializes the record. This is implemented in object.go.

Teardown

The Teardown method is called when the connector is being stopped. It is used to close the connection to the destination.


func (d *Destination) Teardown(ctx context.Context) error {
// do nothing
return nil
}

Here no action is required.

Entrypoint

Next, let's look at cmd/algolia/main.go. This is the entrypoint for the connector.

package main

import (
algolia "github.com/conduitio/conduit-connector-algolia"
sdk "github.com/conduitio/conduit-connector-sdk"
)

func main() {
sdk.Serve(
algolia.Specification,
nil,
algolia.NewDestination,
)
}

sdk.Serve starts the plugin and takes care of its entire lifecycle. Notice nil as the second parameter. This is due to this connector being just a Destination.

Building

Now that we have all the required methods implmented, we can build our new connector:

go build -o algolia cmd/algolia/main.go

This will compile our connector to a binary.

Creating Pipeline with a new Connector

Now that we have our connector built, we can create a new pipeline using the Generator Source as our Conduit source. The Generator Source is a simple source that generates records. Our pipeline will then send the records to our connector to be written to Algolia.

To use your new connector within a pipline, perform the following steps:

  1. Start Conduit.
  2. Create Pipeline:
curl -X 'POST' \
'http://localhost:8080/v1/pipelines' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"config": {
"name": "my-pipeline",
"description": "This pipeline is for testing",
}
}'
  1. Create Generator Source:

This source will produce random records every 5 seconds.

curl -X 'POST' \
'http://localhost:8080/v1/connectors' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"type": "TYPE_SOURCE",
"plugin": "builtin:generator",
"config": {
"readTime": "5s",
"fields": "id:int",
}
}'

Note: The plugin field can take either a path to the compiled connector binary or a name of a builtin plugin like builtin:generator.

  1. Create Algolia Destination (your new connector):
curl -X 'POST' \
'http://localhost:8080/v1/connectors' \
-H 'accept: application/json' \
-H 'Content-Type: application/json' \
-d '{
"type": "TYPE_DESTINATION",
"plugin": "/path/to/algoila",
"config": {
"api_key": "",
"application_id": "",
"index_name": ""
}
}'
  1. Start pipeline

Now, you should be able to see the records being generated and written to Algolia.

What's Next

In this guide, we have covered the basics of how to build a connector for Conduit. The best way to learn more about what's possible with Conduit is to check out the Conduit documentation, the Conduit API documentation, and the Conduit SDK.

You may also see examples of the methods above within the following connectors:

I can't wait to see what you build 🚀. If you have any questions or feedback: