Having ticked off the basics with an Apache Kafka producer and consumer in Go, let’s now check out the AdminClient. This is useful for checking out metadata about the cluster, creating topics, and stuff like that.
Contexts 🔗
To use some of the functions that the AdminClient provides I had to read up on Context
, which I’d not encountered on my brief journey with Go so far. The tl;dr is that a context provides a clean way for functions to timeout or cancel their operation across function calls. Or to put it another way:
Package context defines the Context type, which carries deadlines, cancellation signals, and other request-scoped values across API boundaries and between processes.
This is what it looks like in operation. You define the context (in this with a timeout):
import (
"context"
"time"
// …
)
// …
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Note that you can also do it like this…
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
…but per the code comment in the example given it’s good practice to make sure cancel
is called as soon as the context is done with.
Another thing to note is the nice way to specify time periods in Go. Instead of having to check the API documentation each time as to whether you’re specifying seconds, microseconds, etc, and then doing the necessary maths on the time period that you want to specify, you can instead just use human-friendly notation such as:
-
5*time.Microsecond
-
30*time.Second
So, having defined the context, we pass it when invoking a function call that requires it, such as ClusterID()
in the AdminClient:
c, e := a.ClusterID(ctx)
More completely, the code looks something like this:
// Get the ClusterID
if c, e := a.ClusterID(ctx); e != nil {
fmt.Printf("😢 Error getting ClusterID\n\tError: %v\n", e)
} else {
fmt.Printf("✔️ ClusterID: %v\n", c)
}
If the context times out then an error is returned:
😢 Error getting ClusterID
Error: context.deadlineExceededError context deadline exceeded
Note that if you want to use the context in successive calls, the timeout does not reset on each use. So if you have something like this:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Call the first thing
doSomething(ctx)
// Call the second thing
doSomethingElse(ctx)
The timeout of five seconds includes the execution of the second function. If you want to reset it in between then you’d do this:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Call the first thing
doSomething(ctx)
// Start the context timer again
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
// Call the second thing
doSomethingElse(ctx)
AdminClient 🔗
The docs list comprehensively the functions available from the AdminClient. Here’s a simple example that shows using some of them to list information about the cluster:
package main
import (
"context"
"fmt"
"time"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
// --
// Create AdminClient instance
// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewAdminClient
// Store the config
cm := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092"}
// Variable p holds the new AdminClient instance.
a, e := kafka.NewAdminClient(&cm)
// Make sure we close it when we're done
defer a.Close()
// Check for errors in creating the AdminClient
if e != nil {
if ke, ok := e.(kafka.Error); ok == true {
switch ec := ke.Code(); ec {
case kafka.ErrInvalidArg:
fmt.Printf("😢 Can't create the AdminClient because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
default:
fmt.Printf("😢 Can't create the AdminClient (Kafka error code %d)\n\tError: %v\n", ec, e)
}
} else {
// It's not a kafka.Error
fmt.Printf("😢 Oh noes, there's a generic error creating the AdminClient! %v", e.Error())
}
} else {
fmt.Println("✔️ Created AdminClient")
// Create a context for use when calling some of these functions
// This lets you set a variable timeout on invoking these calls
// If the timeout passes then an error is returned.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Get the ClusterID
if c, e := a.ClusterID(ctx); e != nil {
fmt.Printf("😢 Error getting ClusterID\n\tError: %v\n", e)
} else {
fmt.Printf("✔️ ClusterID: %v\n", c)
}
// Start the context timer again (otherwise it carries on from the original deadline)
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
// Get the ControllerID
if c, e := a.ControllerID(ctx); e != nil {
fmt.Printf("😢 Error getting ControllerID\n\tError: %v\n", e)
} else {
fmt.Printf("✔️ ControllerID: %v\n", c)
}
// Get some metadata
if md, e := a.GetMetadata(nil, false, int(5*time.Second)); e != nil {
fmt.Printf("😢 Error getting cluster Metadata\n\tError: %v\n", e)
} else {
// Print the originating broker info
fmt.Printf("✔️ Metadata [Originating broker]\n")
b := md.OriginatingBroker
fmt.Printf("\t[ID %d] %v\n", b.ID, b.Host)
// Print the brokers
fmt.Printf("✔️ Metadata [brokers]\n")
for _, b := range md.Brokers {
fmt.Printf("\t[ID %d] %v:%d\n", b.ID, b.Host, b.Port)
}
// Print the topics
fmt.Printf("✔️ Metadata [topics]\n")
for _, t := range md.Topics {
fmt.Printf("\t(%v partitions)\t%v\n", len(t.Partitions), t.Topic)
}
}
fmt.Printf("\n\n👋 … and we're done.\n")
}
}
The output looks like this:
✔️ Created AdminClient
✔️ ClusterID: hukPYvRVTF2nU8efMXUq6g
✔️ ControllerID: 1
✔️ Metadata [Originating broker]
[ID 1] localhost:9092/1
✔️ Metadata [brokers]
[ID 1] localhost:9092
✔️ Metadata [topics]
(5 partitions) _kafka-connect-01-status
(1 partitions) ratings
(1 partitions) __confluent.support.metrics
(25 partitions) _kafka-connect-01-offsets
(1 partitions) _kafka-connect-01-configs
(50 partitions) __consumer_offsets
👋 … and we're done.