rmoff's random ramblings
about talks

Learning Golang (some rough notes) - S02E02 - Adding error handling to the Producer

Published Jul 10, 2020 by in Go, Golang, Kafka, Kafka Producer API, Error Handling, Type Assertion at https://preview.rmoff.net/2020/07/10/learning-golang-some-rough-notes-s02e02-adding-error-handling-to-the-producer/

I looked last time at the very bare basics of writing a Kafka producer using Go. It worked, but only with everything lined up and pointing the right way. There was no error handling of any sorts. Let’s see about fixing this now.

A bit of code tidying 🔗

To make the code more readable to me, I split out the configuration into a new variable that’s then passed to the NewProducer, so instead of the more compact but possibly less readable

p, _ := kafka.NewProducer(&kafka.ConfigMap{
    "bootstrap.servers": "localhost:9092"})

I have this:

c := kafka.ConfigMap{
    "boostrap.servers": "localhost:9092"}

p, _ := kafka.NewProducer(&c)

Catching errors from NewProducer() 🔗

Instead of p, when invoking NewProducer (the denoting an empty placeholder variable), we supply a variable into which the error can be stored:

p, e := kafka.NewProducer(&c)

Now we can check it and report back if there’s a problem:

if e != nil {
    fmt.Printf("😢Oh noes, there's an error creating the Producer! %v", e)
} else {
    // do producing stuff
}

If I make a deliberate mistake so that the NewProducer returns an error we get this

😢 Oh noes, there's an error creating the Producer! No such configuration property: "boostrap.servers"

Be Assertive! 🔗

We can take this a step further and look at the type of error that’s returned. Whilst e is a generic interface for holding an error, we can try casting it to a kafka.Error. Why would we want to do this? Well, kafka.Error exposes properties including an ErrorCode that describes the type of error. From that code we can handle the error in a more useful way than just dumping it to the screen. For example, if it’s an error about configuration properties (as in the example above) we could tell the user where to find the reference information for this; but including that in an error to the user if it’s not a problem in this area would be redundant (and possibly confusing).

To cast it to a kafka.Error we use a type assertion:

ke := e.(kafka.Error)

We also check for success when we do it (just in case it’s not actually a kafka.Error)

if ke, ok := e.(kafka.Error); ok==true {
    // it's a kafka.Error
} else {
    // it's an error, but not a kafka.Error
}

Once we’ve established that it’s a kafka.Error we can use the Code() function to access the ErrorCode and handle it with a switch:

if e != nil {
    if ke, ok := e.(kafka.Error); ok == true {
        switch ec := ke.Code(); ec {
        case kafka.ErrInvalidArg:
            fmt.Printf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
        default:
            fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
        }
    } else {
        // It's not a kafka.Error
        fmt.Printf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
    }

So now the same mistake as before in configuring bootstrap.servers is caught and reported like this:

😢 Can't create the producer because you've configured it wrong (code: -186)!
	No such configuration property: "boostrap.servers"

To see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Handling Producer Errors 🔗

When it comes to the Produce I’ve done the same as above - split out the creation of the message into a new variable for clarity, and added a check for error:

// Build the message
m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
    Value: []byte("Hello world")}

// Produce the message
if e := p.Produce(&m, nil); e != nil {
    fmt.Printf("😢 Darn, there's an error producing the message! %v", e.Error())
}

But if I kill my Kafka broker and run my code, I don’t get an error. Why not? Because if you look at the documentation for Produce you’ll see that it says

This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately

So all we’re doing is "fire and forget". Put it on an internal queue, and we’re done. We still don’t know if it was delivered.

Producer Events 🔗

Since the Producer is called asychronously, it uses Go Channels to provide events back to the calling application about what’s going on. These can be of different types, covering both errors, and the status of messages that have been sent for producing.

  • Error events contain … errors ;-)

  • Message events contain information about messages that have been sent for producing, including whether it worked or not.

You can consume these events in two different ways:

  1. Poll the producer’s Events() channel and triage by type

  2. Create a dedicated delivery report channel, and poll Events() for errors only

Here’s an example of the first option, in which we use a Go Routine to listen to all events and handle them based on type:

go func() {
    for {
        // The `select` blocks until one of the `case` conditions
        // are met - therefore we run it in a Go Routine.
        select {
            case ev := <-p.Events():
                // Look at the type of Event we've received
                switch ev.(type) {

                case *kafka.Message:
                    // It's a delivery report
                    km := ev.(*kafka.Message)
                    if km.TopicPartition.Error != nil {
                        fmt.Printf("☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
                            string(km.Value),
                            string(*km.TopicPartition.Topic),
                            km.TopicPartition.Error)
                    } else {
                        fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
                            string(km.Value),
                            string(*km.TopicPartition.Topic),
                            km.TopicPartition.Partition,
                            km.TopicPartition.Offset)
                    }

                case kafka.Error:
                    // It's an error
                    em := ev.(kafka.Error)
                    fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
                default:
                    // It's not anything we were expecting
                    fmt.Printf("Got an event that's not a Message or Error 👻\n\t%v\n", ev)

                }
        }
    }
}()

Now when we produce a message successfully we receive a kafka.Message with a nil value in TopicPartition.Error and the offset of the produced message in TopicPartition.Offset:

✅ Message 'Hello world' delivered to topic 'test_topic_02' (partition 0 at offset 0)

And if there’s a problem we get full details of it

☠️ Uh oh, caught an error:
	foobar:9092/1: Failed to resolve 'foobar:9092': nodename nor servname provided, or not known (after 64ms in state CONNECT)

Phantom events 🔗

If you run the code as-is you’ll notice you get this

✅ Message 'Hello world' delivered to topic 'test_topic_02' (partition 0 at offset 3)

--
✨ All messages flushed from the queue
Got an event that's not a Message or Error 👻
	<nil>
Got an event that's not a Message or Error 👻
	<nil>
Got an event that's not a Message or Error 👻
	<nil>
Got an event that's not a Message or Error 👻
	<nil>
Got an event that's not a Message or Error 👻
	<nil>
…

After the Close() is called, there are still events being consumed by our Go Routine event handler. We don’t want to be doing this (if the Producer is closed, then its Events() channel is meaningless), so use this pattern (inspired by this code) to avoid it:

// For signalling termination from main to go-routine
termChan := make(chan bool, 1)
// For signalling that termination is done from go-routine to main
doneChan := make(chan bool)

go func() {
    doTerm := false
    for !doTerm {
        select {

            // channels that we're listening to

        case <-termChan:
            doTerm = true
        }
    }

    close(doneChan)
}()

// …

// We're ready to finish
termChan <- true
// wait for go-routine to terminate
<-doneChan
// Now we can exit
p.Close()

Make sure you Flush() 🔗

Once we’ve sent our message to the Producer, we get control back straight away, because it’s an asynchronous process. If we don’t put anything else in place the code will run on through to the Close() and exit. We want to make sure we’ve sent all the messages successfully - or not. To do this we use the Flush() function with a timeout of how long we’ll wait before considering sending messages to have failed.

// Flush the Producer queue
if r := p.Flush(10000); r > 0 {
    fmt.Printf("\n--\n⚠️ Failed to flush all messages after 10 seconds. %d message(s) remain\n", r)
} else {
    fmt.Println("\n--\n✨ All messages flushed from the queue")
}

With this in place we get a confirmation on exit of success:

✨ All messages flushed from the queue

or failure:

⚠️ Failed to flush all messages after 10 seconds. 1 message(s) remain

The finished result 🔗

package main

import (
	"fmt"

	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	// --
	// The topic is passed as a pointer to the Producer, so we can't
	// use a hard-coded literal. And a variable is a nicer way to do
	// it anyway ;-)
	topic := "test_topic_02"

	// --
	// Create Producer instance
	// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewProducer

	// Store the config
	c := kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092"}

	// Variable p holds the new Producer instance.
	p, e := kafka.NewProducer(&c)

	// Check for errors in creating the Producer
	if e != nil {
		if ke, ok := e.(kafka.Error); ok == true {
			switch ec := ke.Code(); ec {
			case kafka.ErrInvalidArg:
				fmt.Printf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
			default:
				fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)
			}
		} else {
			// It's not a kafka.Error
			fmt.Printf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
		}

	} else {

		// For signalling termination from main to go-routine
		termChan := make(chan bool, 1)
		// For signalling that termination is done from go-routine to main
		doneChan := make(chan bool)

		// --
		// Send a message using Produce()
		// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.Produce
		//
		// Build the message
		m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
			Value: []byte("Hello world")}

		// Handle any events that we get
		go func() {
			doTerm := false
			for !doTerm {
				// The `select` blocks until one of the `case` conditions
				// are met - therefore we run it in a Go Routine.
				select {
				case ev := <-p.Events():
					// Look at the type of Event we've received
					switch ev.(type) {

					case *kafka.Message:
						// It's a delivery report
						km := ev.(*kafka.Message)
						if km.TopicPartition.Error != nil {
							fmt.Printf("☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
								string(km.Value),
								string(*km.TopicPartition.Topic),
								km.TopicPartition.Error)
						} else {
							fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
								string(km.Value),
								string(*km.TopicPartition.Topic),
								km.TopicPartition.Partition,
								km.TopicPartition.Offset)
						}

					case kafka.Error:
						// It's an error
						em := ev.(kafka.Error)
						fmt.Printf("☠️ Uh oh, caught an error:\n\t%v\n", em)
					default:
						// It's not anything we were expecting
						fmt.Printf("Got an event that's not a Message or Error 👻\n\t%v\n", ev)

					}
				case <-termChan:
					doTerm = true

				}
			}
			close(doneChan)
		}()
		// Produce the message
		if e := p.Produce(&m, nil); e != nil {
			fmt.Printf("😢 Darn, there's an error producing the message! %v", e.Error())
		}

		// --
		// Flush the Producer queue
		t := 10000
		if r := p.Flush(t); r > 0 {
			fmt.Printf("\n--\n⚠️ Failed to flush all messages after %d milliseconds. %d message(s) remain\n", t, r)
		} else {
			fmt.Println("\n--\n✨ All messages flushed from the queue")
		}
		// --
		// Stop listening to events and close the producer
		// We're ready to finish
		termChan <- true
		// wait for go-routine to terminate
		<-doneChan
		// Now we can exit
		p.Close()

	}

}

📺 More Episodes… 🔗

  • Kafka and Go

    • S02E00 - Kafka and Go

    • S02E01 - My First Kafka Go Producer

    • S02E02 - Adding error handling to the Producer

    • S02E03 - Kafka Go Consumer (Channel-based)

    • S02E04 - Kafka Go Consumer (Function-based)

    • S02E05 - Kafka Go AdminClient

    • S02E06 - Putting the Producer in a function and handling errors in a Go routine

    • S02E07 - Splitting Go code into separate source files and building a binary executable

    • S02E08 - Checking Kafka advertised.listeners with Go

    • S02E09 - Processing chunked responses before EOF is reached

  • Learning Go

    • S01E00 - Background

    • S01E01 - Pointers

    • S01E02 - Slices

    • S01E03 - Maps

    • S01E04 - Function Closures

    • S01E05 - Interfaces

    • S01E06 - Errors

    • S01E07 - Readers

    • S01E08 - Images

    • S01E09 - Concurrency (Channels, Goroutines)

    • S01E10 - Concurrency (Web Crawler)


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025