Skip to content
Snippets Groups Projects
Commit 15990eb3 authored by Anthony Michel's avatar Anthony Michel
Browse files

Merge branch 'kafka-producer-general' into 'master'

Moving kafkaproducer for email into general packages

See merge request zegal/back-end/general!11
parents 1cdf0554 7b568159
No related branches found
Tags v0.0.9.01
1 merge request!11Moving kafkaproducer for email into general packages
......@@ -74,6 +74,21 @@
- func Close()
- func listen(f func(string), preMarkOffset bool) // Should run this as goroutines
-------------------------------------
## Package - kafkaproducer
### description:
- Initialise a connection to the kafka brokers and a producer to specific topic
- At the moment only producing event for email server is in place but we can extend to produce to other topics by adding params on init and new function to produce
### dependency:
- github.com/segmentio/kafka-go
### setup package:
- func Init(broker string, topic string) *DataHolder
### support function:
- func SendEmailEvent(emailEvent EmailEvent) error // Will send a kafka message to event server for it to send the email
-------------------------------------
## Package - logger
### dependency:
......
package kafkaproducer
import (
"context"
"encoding/json"
"time"
"git.dragonlaw.com.hk/go/general/logger"
kafka "github.com/segmentio/kafka-go"
)
var (
// Connection struct storing connection information and producer
Connection = &DataHolder{}
)
// DataHolder structure holding the kafka connection information and producer initialized
type DataHolder struct {
BrokerURL string
Topic string
Writer *kafka.Writer
}
func getKafkaWriter(kafkaURL, topic string) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{kafkaURL},
Topic: topic,
Balancer: &kafka.LeastBytes{},
})
}
// Init function that create a unique producer for the service
func Init(broker string, topic string) *DataHolder {
// Store setting
Connection = &DataHolder{
BrokerURL: broker,
Topic: topic,
}
Connection.Writer = getKafkaWriter(broker, topic)
logger.Info("Kafka Email Init", "Kafka Producer initialised for email event",
logger.Fields{"broker": broker, "topic": Connection.Topic})
return Connection
}
// EmailEvent is the struct representing the event to be sent to kafka to trigger an email
type EmailEvent struct {
Msg string `json:"msg"`
TemplateName string `json:"template_name"`
RecipientEmail string `json:"to_email"`
Email string `json:"email"`
Hostname string `json:"hostname"`
Name string `json:"name"`
Time string `json:"time"`
RecipientName string `json:"to_name,omitempty"`
CcEmails string `json:"copy,omitempty"`
DynamicData map[string]interface{} `json:"dynamic_data,omitempty"`
}
// SendEmailEvent transform the structure passed into a kafka message to be consummed by email-server to send an email to client
// emailEvent.Time, emailEvent.Msg, emailEvent.Name are being overridden
// If an error occurs, the error is returned
func SendEmailEvent(emailEvent EmailEvent) error {
//TODO remove, temp for debbuging
logger.Info("SendEmailEvent", "Pushing email event to kafka topic "+Connection.Topic)
//enforcing the time and msg field value
emailEvent.Time = time.Now().UTC().Format(time.RFC3339)
emailEvent.Msg = "email-server:email:send" //TODO remove that when all services migrated
messageValue, err := json.Marshal(emailEvent)
if err != nil {
logger.Error("SendEmailEvent", err, "Error while marshaling emailEvent",
logger.Fields{"emailEvent": emailEvent})
return err
}
msg := kafka.Message{
//as it seems we have to set a key, this will ensure all mails sent to a recipient are done in order
Key: []byte(emailEvent.RecipientEmail),
Value: messageValue,
}
if Connection.Writer == nil {
logger.Info("SendEmailEvent", "Connection Writer is nil !!! trying to re-init",
logger.Fields{"topic": Connection.Topic, "broker": Connection.BrokerURL})
Connection.Writer = getKafkaWriter(Connection.BrokerURL, Connection.Topic)
}
err = Connection.Writer.WriteMessages(context.Background(), msg)
if err != nil {
logger.Error("SendEmailEvent", err, "Error while writing message to kafka",
logger.Fields{"data": string(messageValue)})
return err
}
//removing property Msg to avoid having this log grabbed by fluentd, remove that once fluentd is decomissioned
emailEvent.Msg = ""
trimmedMessageValue, _ := json.Marshal(emailEvent)
logger.Info("SendEmailEvent", "Pushing email event to kafka topic "+Connection.Topic,
logger.Fields{"data": string(trimmedMessageValue)})
return nil
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment