rmoff's random ramblings
about talks

Learning Golang (some rough notes) - S02E06 - Putting the Producer in a function and handling errors in a Go routine

Published Jul 15, 2020 by in Go, Golang, Kafka, Kafka Producer API, Go Routine at https://preview.rmoff.net/2020/07/15/learning-golang-some-rough-notes-s02e06-putting-the-producer-in-a-function-and-handling-errors-in-a-go-routine/

When I set out to learn Go one of the aims I had in mind was to write a version of this little Python utility which accompanies a blog I wrote recently about understanding and diagnosing problems with Kafka advertised listeners. Having successfully got Producer, Consumer, and AdminClient API examples working, it is now time to turn to that task.

I’m quite keen to figure out how to do this properly and take the opportunity to learn. It would be easy enough to copy & paste all my snippets together, ignore any error handling, and check the task off as done. But since I don’t have a strong background in coding, now seems a good chance to try and instill a little bit of rigour in what I’m doing.

Command line arguments 🔗

Instead of hard coding the broker host and port, I want to be able to pass this as a commandline argument. This is easy enough using the OS package, which behaves very similar to the Python equivalent:

import (
	"os"
)

func main() {

	// Read the first commandline argument to get the broker details
	broker := os.Args[1]
	fmt.Printf("Broker: %v", broker)

I’m using VSCode for my IDE as it works very nicely with Go - both for writing code, and debugging it. To pass an argument to the command line as part of debugging go to Run → Open Configurations and in the JSON file set the argument(s) that you want to pass:

"args": ["localhost:9092"]

This works fine when the code is run

Broker: localhost:9092

But what if the user doesn’t specify the required commandline arguments?

panic: runtime error: index out of range [1] with length 1

goroutine 1 [running]:
main.main()
	/Users/rmoff/git/rmoff-blog/content/code/go/kafka/producer_function/producer_function.go:15 +0x24e

So we need a bit of care here, and check for the length too

var broker string
if len(os.Args) == 2 {
    broker = os.Args[1]
} else {
    fmt.Println("(No broker specifed on commandline; defaulting to localhost:9092)")
    broker = "localhost:9092"
}
fmt.Printf("Broker: %v", broker)

Functions 🔗

The main() code is going to look something like this:

// Do init stuff to set vars etc
// …

// Create AdminClient connection to check metadata
doAdmin(broker)
// Produce message
doProduce(broker,topic)
// Consume message
doConsume(broker,topic)
// fin.

Each one is dependent on the other, so we need to know if there was an error…

Errors 🔗

Following the same pattern as I explored here I’m expecting to have something that looks like this:

// Do init stuff to set vars etc
// …

// Create AdminClient connection to check metadata
if e := doAdmin(broker); e != nil {
    fmt.Printf("There was a problem with AdminClient :-(\n%v", e)
} else {
    // Produce message
    if e := doProduce(broker, topic); e != nil {
        fmt.Printf("There was a problem calling the producer :-(\n%v", e)
    } else {
        // Consume message
        if e := doConsume(broker, topic); e != nil {
            fmt.Printf("There was a problem calling the consumer :-(\n%v", e)
        }
    }
}

// fin.

To do this, each function needs to return an error, so the function looks like this:

func doProduce(broker, topic string) error {
    // If you hit an error then
    return errors.New("OH NO! THERE WAS AN ERROR")

    // assuming everything has gone ok return no error
    return nil
}

Where we were previously dumping messages to the output:

fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)

We now want to take this and pass it back as the error. Since errors.New() takes a string it makes sense to replace our existing fmt.Printf with fmt.Sprintf and pass this to errors.New():

return errors.New(fmt.Sprintf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e))

The Go linter in VSCode is brilliant here as it actively tells you this is not the best way to do it, with some nice orange underlining:

error01

If you hover over it you get a nice tip of how to write the code better:

error02

So, instead of

return errors.New(fmt.Sprintf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e))

We just replace

fmt.Printf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)

with

return fmt.Errorf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v\n", ec, e)

Error in Go routines 🔗

This all works, except in the Go routine within the function. In the case of the Producer code the Go routine is there to handle events such as message delivery reports, errors, etc. Since a Go routine is a function itself with no return variables, then we can’t return an error from within it. If you try to, you get this:

too many arguments to return
	have (error)
	want ()

Now, maybe the code in your function will catch an error as a side effect of the error thrown in the Go routine, but it’s not great to rely on that happening. Instead we can provide a channel that the Go routine can write to if there’s an error, and then check that from our parent function and return an error if we find one. Something like this:

package main

import (
	"errors"
	"fmt"
	"time"
)

func main() {
	if e := doThisThing(); e != nil {
		fmt.Printf("doThisThing failed.\n%v", e)
	} else {
		fmt.Printf("doThisThing worked.\n")

	}
}

func doThisThing() error {

	ec := make(chan string, 8)

	go func() {

		// If we're all good then do stuff
		// …

		// If we hit an error then log an error
		ec <- fmt.Sprintf("here is an error from the go routine :(\n")
		ec <- fmt.Sprintf("here is another error from the go routine :(\n")
		close(ec)
	}()

	// Do all our stuff in the function that we need to
	// …
	time.Sleep(2 * time.Second)
	// pretend we're doing stuff
	// …

	// When we're ready to return, check if the go routine has sent errors
	// Note that we're relying on the Go routine to close the channel, otherwise
	// we deadlock.
	// If there are no errors then the channel is simply closed and we read no values.
	done := false
	var e string
	for !done {
		if t, o := <-ec; o == false {
			// o is false if we've read all the values and the channel is closed
			// If that's the case, then we're done here
			done = true
		} else {
			// We've read a value so let's concatenate it with the others
			// that we've got
			e += t
		}
	}

	if len(e) > 0 {
		// If we've got any errors, then return an error to the caller
		return errors.New(e)
	}

	// assuming everything has gone ok return no error
	return nil

}

When run this looks like:

doThisThing failed.
here is an error from the go routine :(
here is another error from the go routine :(

The Producer 🔗

With this error handling in place, I can now call my doProduce function and get an error (or not) back from it successfully:

  • It works!

    ℹ️ No broker specified on commandline; defaulting to localhost:9092
    
    Broker: localhost:9092
    
    --
    ✨ All messages flushed from the queue
    ✅ Message 'foo / Thu, 16 Jul 2020 00:05:57 +0100' delivered to topic 'rmoff_test_00' (partition 0 at offset 11)
    
    -=-=
    Wrapping up…
    👋 … and we're done.
    Oh joy! Oh rejoice! Calling the producer worked *just fine*.
  • It doesn’t!

    ℹ️ No broker specified on commandline; defaulting to localhost:9092
    
    Broker: localhost:9092
    -=-=
    Wrapping up…
    ❌ … returning an error
    
    There was a problem calling the producer :-(
    
    **☠️ Uh oh, caught an error:
    	localhost:9092/bootstrap: Connect to ipv6#[::1]:9092 failed: Connection refused (after 3ms in state CONNECT)
    
    **☠️ Uh oh, caught an error:
    	1/1 brokers are down
    
    --
    ⚠️ Failed to flush all messages after 1000 milliseconds. 1 message(s) remain

Here’s the full code, and a Docker Compose you can use to try it with.

package main

import (
	"errors"
	"fmt"
	"os"
	"time"

	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

	// Read the first commandline argument to get the broker details
	var broker string
	if len(os.Args) == 2 {
		broker = os.Args[1]
	} else {
		fmt.Printf("ℹ️ No broker specified on commandline; defaulting to localhost:9092\n\n")
		broker = "localhost:9092"
	}
	fmt.Printf("Broker: %v\n", broker)

	// Set the topic name that we'll use
	topic := "rmoff_test_00"

	// Create Admin Connection
	// doAdmin(broker)
	// Produce message
	if e := doProduce(broker, topic); e != nil {
		fmt.Printf("\nThere was a problem calling the producer :-(\n%v", e)
	} else {
		fmt.Println("Oh joy! Oh rejoice! Calling the producer worked *just fine*.")
	}
	// Consume message
	// doConsume()
	// fin.

}

func doProduce(broker, topic string) error {

	// --
	// Create Producer instance
	// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#NewProducer

	// Store the config
	c := kafka.ConfigMap{
		"bootstrap.servers": broker}

	// Check for errors in creating the Producer
	if p, e := kafka.NewProducer(&c); e != nil {
		if ke, ok := e.(kafka.Error); ok == true {
			switch ec := ke.Code(); ec {
			case kafka.ErrInvalidArg:
				return fmt.Errorf("😢 Can't create the producer because you've configured it wrong (code: %d)!\n\t%v\n\nTo see the configuration options, refer to https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md", ec, e)
			default:
				return fmt.Errorf("😢 Can't create the producer (Kafka error code %d)\n\tError: %v", ec, e)
			}
		} else {
			// It's not a kafka.Error
			return fmt.Errorf("😢 Oh noes, there's a generic error creating the Producer! %v", e.Error())
		}

	} else {

		defer p.Close()

		// For signalling termination from main to go-routine
		termChan := make(chan bool, 1)
		// For signalling that termination is done from go-routine to main
		doneChan := make(chan bool)
		// For capturing errors from the go-routine
		errorChan := make(chan string, 8)

		// --
		// Send a message using Produce()
		// https://docs.confluent.io/current/clients/confluent-kafka-go/index.html#Producer.Produce
		//
		// Build the message
		m := kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic},
			Value: []byte(fmt.Sprintf("foo / %v",
				time.Now().Format(time.RFC1123Z)))}

		// Handle any events that we get
		go func() {
			doTerm := false
			for !doTerm {
				// The `select` blocks until one of the `case` conditions
				// are met - therefore we run it in a Go Routine.
				select {
				case ev := <-p.Events():
					// Look at the type of Event we've received
					switch ev.(type) {

					case *kafka.Message:
						// It's a delivery report
						km := ev.(*kafka.Message)
						if km.TopicPartition.Error != nil {
							errorChan <- fmt.Sprintf("\n**☠️ Failed to send message '%v' to topic '%v'\n\tErr: %v",
								string(km.Value),
								string(*km.TopicPartition.Topic),
								km.TopicPartition.Error)

						} else {
							fmt.Printf("✅ Message '%v' delivered to topic '%v' (partition %d at offset %d)\n",
								string(km.Value),
								string(*km.TopicPartition.Topic),
								km.TopicPartition.Partition,
								km.TopicPartition.Offset)
						}

					case kafka.Error:
						// It's an error
						em := ev.(kafka.Error)
						errorChan <- fmt.Sprintf("\n**☠️ Uh oh, caught an error:\n\t%v\n", em)

					default:
						// It's not anything we were expecting
						errorChan <- fmt.Sprintf("\n**Got an event that's not a Message or Error 👻\n\t%v\n", ev)

					}
				case <-termChan:
					doTerm = true

				}
			}
			close(errorChan)
			close(doneChan)
		}()

		// Produce the message
		if e := p.Produce(&m, nil); e != nil {
			errorChan <- fmt.Sprintf("😢 Darn, there's an error producing the message! %v", e.Error())
		}

		// --
		// Flush the Producer queue
		t := 1000
		if r := p.Flush(t); r > 0 {
			errorChan <- fmt.Sprintf("\n--\n⚠️ Failed to flush all messages after %d milliseconds. %d message(s) remain", t, r)

		} else {
			fmt.Println("\n--\n✨ All messages flushed from the queue")
		}
		// --
		// Stop listening to events and close the producer
		// We're ready to finish
		termChan <- true
		// wait for go-routine to terminate
		<-doneChan
		// Now we can get ready to exit
		fmt.Printf("\n-=-=\nWrapping up…\n")

		// When we're ready to return, check if the go routine has sent errors
		// Note that we're relying on the Go routine to close the channel, otherwise
		// we deadlock.
		// If there are no errors then the channel is simply closed and we read no values.
		done := false
		var e string
		for !done {
			if t, o := <-errorChan; o == false {
				// o is false if we've read all the values and the channel is closed
				// If that's the case, then we're done here
				done = true
			} else {
				// We've read a value so let's concatenate it with the others
				// that we've got
				e += t
			}
		}

		if len(e) > 0 {
			// If we've got any errors, then return an error to the caller

			fmt.Printf("❌ … returning an error\n")
			return errors.New(e)
		}

		// assuming everything has gone ok return no error
		fmt.Printf("👋 … and we're done.\n")
		return nil

	}

}

📺 More Episodes… 🔗

  • Kafka and Go

    • S02E00 - Kafka and Go

    • S02E01 - My First Kafka Go Producer

    • S02E02 - Adding error handling to the Producer

    • S02E03 - Kafka Go Consumer (Channel-based)

    • S02E04 - Kafka Go Consumer (Function-based)

    • S02E05 - Kafka Go AdminClient

    • S02E06 - Putting the Producer in a function and handling errors in a Go routine

    • S02E07 - Splitting Go code into separate source files and building a binary executable

    • S02E08 - Checking Kafka advertised.listeners with Go

    • S02E09 - Processing chunked responses before EOF is reached

  • Learning Go

    • S01E00 - Background

    • S01E01 - Pointers

    • S01E02 - Slices

    • S01E03 - Maps

    • S01E04 - Function Closures

    • S01E05 - Interfaces

    • S01E06 - Errors

    • S01E07 - Readers

    • S01E08 - Images

    • S01E09 - Concurrency (Channels, Goroutines)

    • S01E10 - Concurrency (Web Crawler)


Robin Moffatt

Robin Moffatt works on the DevRel team at Confluent. He likes writing about himself in the third person, eating good breakfasts, and drinking good beer.

Story logo

© 2025