Apa Itu Event Bus ?
Event bus adalah sebuah mekanisme yg bisa dipakai untuk berkomunikasi antar komponen tanpa saling tahu satu sama lain. Kalau bisa saya bilang ini juga bagian pub-sub pattern. Lalu, apa kegunaan dari event bus ini? Kegunaanya adalah untuk decoupled antar komponen sehingga tidak saling bergantung secara langsung. Keuntungan lainnya adalah, kita bisa membuat monolith rasa microservice. Dalam arsitektur microservice, salah satu pattern yg sering dipakai adalah pubsub, dan kita bisa menerapkan pubsub ini dalam arsitektur monolith melalui event bus.
Jika digambarkan event bus itu berbentuk seperti gambar berikut
Dimana ada publisher yang dapat mengirim sebuah pesan ke Event Bus dan pesan ini dapat didapatkan oleh banyak subscriber.
Menggunakan event bus permalink
Kita akan coba menggunakan event bus dalam program yang akan kita buat. Dan kita akan membuatnya menggunakan bahasa Go atau Golang.
Jika kalian belum tahu tentang bahasa Golang, bisa cek Dasar Pemrograman Golang.
Yang akan kita jadikan kasus adalah order barang dan payment pada sebuah Online shop. Ketika user membuat sebuah order maka akan dibuat sebuah payment.
+-------------+
|create order +----------+
+-------------+ |
+----v----+
| |
| BUS |
| |
+----^----+
|
|
+--------------+ |
|create payment+---------+
+--------------+
Init project permalink
Oke, pertama kita perlu melakukan ini project. Buat folder project kalian, lalu init module dengan mengetikkan command go mod init shop di folder project. Untuk mempersingkat, saya akan namai module ini sebagai shop.
Versi Go yang digunakan pada saat pembuatan artikel ini adalah
go1.12.17.
Selanjutnya kita akan buat model nya terlebih dahulu. Buat package model, lalu buat file model.go.
├── model
│ └── model.go
Di package model ini, kita membuat tiga buah struct yaitu Product, Order, dan Payment. Sebuah Order dapat memiliki banyak product di dalam nya.
package model
import "fmt"
type Product struct {
ID int64
Price float64
}
type Order struct {
ID int64
ProductIDs []int64
}Lalu sebuah Payment akan memiliki OrderID berikut PaymentStatus nya. PaymentStatus ini bisa dibilang adalah sebuah “enum”, yang memiliki tiga tipe yaitu pending, paid dan canceled.
type Payment struct {
ID int64
OrderID int64
Status PaymentStatus
}
type PaymentStatus int
// PaymentStatus enum
const (
PaymentStatusPending = PaymentStatus(1)
PaymentStatusPaid = PaymentStatus(2)
PaymentStatusCanceled = PaymentStatus(3)
)Selanjutnya, kita akan membuat package service. Ada tiga buah service yang dibuat yaitu ProductService, OrderService dan PaymentService yang semuanya merupakan interface.
└── service
└── service.go
ProductService akan memiliki method yaitu List. Lalu, OrderService memiliki method CreateOrder. Terkahir, PaymentService akan memiliki method CreatePayment.
package service
import "shop/model"
type (
ProductService interface {
List() []model.Product
}
OrderService interface {
CreateOrder(productIDs []int64) *model.Order
}
PaymentService interface {
CreatePayment(orderID int64) *model.Payment
}
)Selanjutnya kita perlu melakukan implementasi dari interface tersebut dengan struct. Pada package service buat file product_service.
└── service
├── product_service.go
Untuk mempersingkat, data product akan kita simpan di dalam field products. Lalu, dengan fungsi NewProductService kita melakukan instansiasi productService sekaligus mengisi field products dengan data dummy.
package service
import (
"shop/model"
"time"
)
type productService struct {
products []model.Product
)
func NewProductService() ProductService {
return &productService{
products: []model.Product{
model.Product{ID: 111, Price: 100.0},
model.Product{ID: 112, Price: 200.0},
model.Product{ID: 113, Price: 300.0},
},
}
}
func (ps *productService) List() []model.Product {
return ps.products
}Lalu, buat file order_service pada package service.
└── service
├── order_service.go
Pada kode ini, terdapat field bus dengan tipe *bus.Bus yang digunakan untuk mempublish event/topic. pacakge yang digunakan adalah github.com/mustafaturan/bus. Argumen ke dua pada fungsi Emit adalah nama topic yg dipublish, di sini kita gunakan nama order.created.
package service
import (
"context"
"time"
"shop/eventbus"
"shop/model"
"github.com/mustafaturan/bus"
log "github.com/sirupsen/logrus"
)
type orderService struct {
bus *bus.Bus
productService ProductService
}
func NewOrderService(ps ProductService, bus *bus.Bus) OrderService {
return &orderService{
productService: ps,
bus: bus,
}
}
func (o *orderService) CreateOrder(productIDs []int64) *model.Order {
order := &model.Order{
ID: time.Now().UnixNano(),
ProductIDs: productIDs,
}
log.Info("create order, productIDs: ", productIDs)
// kita publish atau emit "order.created"
event, err := o.bus.Emit(context.Background(), "order.created", *order)
if err != nil {
log.Error(err)
return
}
return order
}
Service yang dibuat selanjutnya adalah payment_service. Pada service ini, terdapat satu method CreatePayment.
package service
import (
"shop/model"
"time"
)
type (
paymentService struct {
orderService OrderService
}
)
func NewPaymentService(os OrderService) PaymentService {
return &paymentService{
orderService: os,
}
}
func (ps *paymentService) CreatePayment(orderID int64) *model.Payment {
return &model.Payment{
ID: time.Now().UnixNano(),
OrderID: orderID,
Status: model.PaymentStatusPending,
}
}Kemudian, kita akan membuat instansiasi pacakge bus. Fungsi constructor ini saya ambil dari contoh yang diberikan di repo github.com/mustafaturan/bus.
package eventbus
import (
"github.com/mustafaturan/bus"
"github.com/mustafaturan/monoton"
"github.com/mustafaturan/monoton/sequencer"
)
func NewBus() *bus.Bus {
// configure id generator (it doesn't have to be monoton)
node := uint64(1)
initialTime := uint64(1577865600000) // set 2020-01-01 PST as initial time
m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
if err != nil {
log.Fatal(err)
}
// init an id generator
var idGenerator bus.Next = (*m).Next
// create a new bus instance
b, err := bus.NewBus(idGenerator)
if err != nil {
log.Fatal(err)
}
return b
}Oke, selanjutnya kita akan membuat package eventhandler.
├── eventhandler
│ └── handler.go
Event bus ini memiliki sebuah handler yg berupa fungsi. Handler ini gunanya untuk menerima event-event yang diemit ke dalam event bus. Dari event yang diterima kita dapat mengecek jenis topic-nya.
package eventhandler
import (
"shop/eventbus"
"shop/model"
"shop/service"
"github.com/mustafaturan/bus"
log "github.com/sirupsen/logrus"
)
type EventHandler struct {
PaymentService service.PaymentService
}
func (e *EventHandler) HandleOrder(event *bus.Event) {
switch event.Topic {
case "order.created":
log.Infof("recieved event %v", event.ID)
order, ok := event.Data.(model.Order)
if !ok {
return
}
payment := e.PaymentService.CreatePayment(order.ID)
log.Info("create payment", payment)
}
}Sekarang kita akan buat fungsi main, di sini kita akan melakukan wiring service-service yang sudah dibuat.
package main
import (
"os"
"os/signal"
"syscall"
"shop/eventbus"
"shop/eventhandler"
"shop/service"
"github.com/mustafaturan/bus"
log "github.com/sirupsen/logrus"
)
func main() {
handler := &eventhandler.EventHandler{}
bbus := eventbus.NewBus()
bbus.RegisterTopics([]string{"order.created"})
bbus.RegisterHandler("order-channel", &bus.Handler{
Matcher: "order.*", // match untuk semua order
Handle: handler.HandleOrder,
})
productService := service.NewProductService()
orderService := service.NewOrderService(productService, bbus)
paymentSerivce := service.NewPaymentService(orderService)
handler.PaymentService = paymentSerivce
products := productService.List()
orderService.CreateOrder([]int64{products[0].ID})
// kode berikut untuk memblok goroutine utama
sigCh := make(chan os.Signal)
done := make(chan bool)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
log.Info("exiting...")
done <- true
}()
<-done
}Ketika dijalankan maka output dari program akan seperti ini
INFO[0000] create order, productIDs: [1586206522725414000]
INFO[0000] recieved event 0096Tf1h00000001
INFO[0000] create payment{id: 1586206522725562000, order_id: 1586206522725417000, status: pending}
^CINFO[0001] exiting...
0096Tf1h00000001 merupakan id dari event yang diemit, dari urutan log yg muncul.
Jadi, begitulah cara kerja dan penggunaan event bus. Mungkin, di artikel selanjutnya akan dibahas implementasi event bus pada sebuah web service.