Apa Itu Event Bus ?

— 14 minute read

Event bus adalah se­buah mekanisme yg bisa di­pakai un­tuk berko­mu­nikasi an­tar kom­po­nen tanpa sal­ing tahu satu sama lain. Kalau bisa saya bi­lang ini juga bagian pub-sub pat­tern. Lalu, apa kegu­naan dari event bus ini? Kegunaanya adalah un­tuk de­cou­pled an­tar kom­po­nen se­hingga tidak sal­ing bergan­tung se­cara lang­sung. Keuntungan lain­nya adalah, kita bisa mem­buat mono­lith rasa mi­croser­vice. Dalam ar­sitek­tur mi­croser­vice, salah satu pat­tern yg ser­ing di­pakai adalah pub­sub, dan kita bisa men­er­ap­kan pub­sub ini dalam ar­sitek­tur mono­lith melalui event bus.

Jika digam­barkan event bus itu berben­tuk seperti gam­bar berikut

Event Bus
Event Bus

Dimana ada pub­lisher yang da­pat men­girim se­buah pe­san ke Event Bus dan pe­san ini da­pat di­da­p­atkan oleh banyak sub­scriber.

Menggunakan event bus perma­link

Kita akan coba meng­gu­nakan event bus dalam pro­gram yang akan kita buat. Dan kita akan mem­bu­at­nya meng­gu­nakan ba­hasa Go atau Golang.

Jika kalian belum tahu ten­tang ba­hasa Golang, bisa cek Dasar Pemrograman Golang.

Yang akan kita jadikan ka­sus adalah or­der barang dan pay­ment pada se­buah Online shop. Ketika user mem­buat se­buah or­der maka akan dibuat se­buah pay­ment.


+-------------+
|create order +----------+
+-------------+          |
                    +----v----+
                    |         |
                    |   BUS   |
                    |         |
                    +----^----+
                         |
                         |
+--------------+         |
|create payment+---------+
+--------------+

Init pro­ject perma­link

Oke, per­tama kita perlu melakukan ini pro­ject. Buat folder pro­ject kalian, lalu init mod­ule den­gan mengetikkan com­mand go mod init shop di folder pro­ject. Untuk mem­pers­ingkat, saya akan na­mai mod­ule ini se­ba­gai shop.

Versi Go yang di­gu­nakan pada saat pem­bu­atan ar­tikel ini adalah go1.12.17.

Selanjutnya kita akan buat model nya ter­lebih dahulu. Buat pack­age model, lalu buat file model.go.

├── model
│   └── model.go

Di pack­age model ini, kita mem­buat tiga buah struct yaitu Product, Order, dan Payment. Sebuah Order da­pat memi­liki banyak prod­uct di dalam nya.

package model

import "fmt"

type Product struct {
ID int64
Price float64
}

type Order struct {
ID int64
ProductIDs []int64
}

Lalu se­buah Payment akan memi­liki OrderID berikut PaymentStatus nya. PaymentStatus ini bisa di­bi­lang adalah se­buah enum”, yang memi­liki tiga tipe yaitu pending, paid dan canceled.

type Payment struct {
ID int64
OrderID int64
Status PaymentStatus
}

type PaymentStatus int

// PaymentStatus enum
const (
PaymentStatusPending = PaymentStatus(1)
PaymentStatusPaid = PaymentStatus(2)
PaymentStatusCanceled = PaymentStatus(3)
)

Selanjutnya, kita akan mem­buat pack­age service. Ada tiga buah ser­vice yang dibuat yaitu ProductService, OrderService dan PaymentService yang se­muanya meru­pakan interface.

└── service
    └── service.go

ProductService akan memi­liki method yaitu List. Lalu, OrderService memi­liki method CreateOrder. Terkahir, PaymentService akan memi­liki method CreatePayment.

package service

import "shop/model"

type (
ProductService interface {
List() []model.Product
}

OrderService interface {
CreateOrder(productIDs []int64) *model.Order
}

PaymentService interface {
CreatePayment(orderID int64) *model.Payment
}
)

Selanjutnya kita perlu melakukan im­ple­men­tasi dari interface terse­but den­gan struct. Pada pack­age service buat file product_service.

└── service
    ├── product_service.go

Untuk mem­pers­ingkat, data prod­uct akan kita sim­pan di dalam field products. Lalu, den­gan fungsi NewProductService kita melakukan in­stan­si­asi productService sekali­gus mengisi field products den­gan data dummy.

package service

import (
"shop/model"
"time"
)

type productService struct {
products []model.Product
)

func NewProductService() ProductService {
return &productService{
products: []model.Product{
model.Product{ID: 111, Price: 100.0},
model.Product{ID: 112, Price: 200.0},
model.Product{ID: 113, Price: 300.0},
},
}
}

func (ps *productService) List() []model.Product {
return ps.products
}

Lalu, buat file order_service pada pack­age service.

└── service
    ├── order_service.go

Pada kode ini, ter­da­pat field bus den­gan tipe *bus.Bus yang di­gu­nakan un­tuk mem­pub­lish event/​topic. pacakge yang di­gu­nakan adalah github.com/mustafaturan/bus. Argumen ke dua pada fungsi Emit adalah nama topic yg dipub­lish, di sini kita gu­nakan nama order.created.

package service

import (
"context"
"time"

"shop/eventbus"
"shop/model"

"github.com/mustafaturan/bus"
log "github.com/sirupsen/logrus"
)

type orderService struct {
bus *bus.Bus
productService ProductService
}

func NewOrderService(ps ProductService, bus *bus.Bus) OrderService {
return &orderService{
productService: ps,
bus: bus,
}
}

func (o *orderService) CreateOrder(productIDs []int64) *model.Order {
order := &model.Order{
ID: time.Now().UnixNano(),
ProductIDs: productIDs,
}

log.Info("create order, productIDs: ", productIDs)

// kita publish atau emit "order.created"
event, err := o.bus.Emit(context.Background(), "order.created", *order)
if err != nil {
log.Error(err)
return
}

return order
}

Service yang dibuat se­lan­jut­nya adalah payment_service. Pada ser­vice ini, ter­da­pat satu method CreatePayment.

package service

import (
"shop/model"
"time"
)

type (
paymentService struct {
orderService OrderService
}
)

func NewPaymentService(os OrderService) PaymentService {
return &paymentService{
orderService: os,
}
}

func (ps *paymentService) CreatePayment(orderID int64) *model.Payment {
return &model.Payment{
ID: time.Now().UnixNano(),
OrderID: orderID,
Status: model.PaymentStatusPending,
}
}

Kemudian, kita akan mem­buat in­stan­si­asi pacakge bus. Fungsi con­struc­tor ini saya am­bil dari con­toh yang diberikan di repo github.com/mustafaturan/bus.

package eventbus

import (
"github.com/mustafaturan/bus"
"github.com/mustafaturan/monoton"
"github.com/mustafaturan/monoton/sequencer"
)

func NewBus() *bus.Bus {
// configure id generator (it doesn't have to be monoton)
node := uint64(1)
initialTime := uint64(1577865600000) // set 2020-01-01 PST as initial time
m, err := monoton.New(sequencer.NewMillisecond(), node, initialTime)
if err != nil {
log.Fatal(err)
}

// init an id generator
var idGenerator bus.Next = (*m).Next

// create a new bus instance
b, err := bus.NewBus(idGenerator)
if err != nil {
log.Fatal(err)
}

return b
}

Oke, se­lan­jut­nya kita akan mem­buat pack­age eventhandler.

├── eventhandler
│   └── handler.go

Event bus ini memi­liki se­buah han­dler yg berupa fungsi. Handler ini gu­nanya un­tuk mener­ima event-event yang diemit ke dalam event bus. Dari event yang di­ter­ima kita da­pat menge­cek je­nis topic-nya.

package eventhandler

import (
"shop/eventbus"
"shop/model"
"shop/service"

"github.com/mustafaturan/bus"
log "github.com/sirupsen/logrus"
)

type EventHandler struct {
PaymentService service.PaymentService
}

func (e *EventHandler) HandleOrder(event *bus.Event) {
switch event.Topic {
case "order.created":
log.Infof("recieved event %v", event.ID)
order, ok := event.Data.(model.Order)
if !ok {
return
}

payment := e.PaymentService.CreatePayment(order.ID)
log.Info("create payment", payment)
}
}

Sekarang kita akan buat fungsi main, di sini kita akan melakukan wiring ser­vice-ser­vice yang su­dah dibuat.

package main

import (
"os"
"os/signal"
"syscall"

"shop/eventbus"
"shop/eventhandler"
"shop/service"

"github.com/mustafaturan/bus"
log "github.com/sirupsen/logrus"
)

func main() {
handler := &eventhandler.EventHandler{}

bbus := eventbus.NewBus()
bbus.RegisterTopics([]string{"order.created"})
bbus.RegisterHandler("order-channel", &bus.Handler{
Matcher: "order.*", // match untuk semua order
Handle: handler.HandleOrder,
})

productService := service.NewProductService()
orderService := service.NewOrderService(productService, bbus)
paymentSerivce := service.NewPaymentService(orderService)

handler.PaymentService = paymentSerivce

products := productService.List()
orderService.CreateOrder([]int64{products[0].ID})

// kode berikut untuk memblok goroutine utama
sigCh := make(chan os.Signal)
done := make(chan bool)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM)
go func() {
<-sigCh
log.Info("exiting...")
done <- true
}()
<-done
}

Ketika di­jalankan maka out­put dari pro­gram akan seperti ini

INFO[0000] create order, productIDs: [1586206522725414000] 
INFO[0000] recieved event 0096Tf1h00000001              
INFO[0000] create payment{id: 1586206522725562000, order_id: 1586206522725417000, status: pending} 
^CINFO[0001] exiting...                                   

0096Tf1h00000001 meru­pakan id dari event yang diemit, dari uru­tan log yg muncul.

Jadi, be­gi­t­u­lah cara kerja dan peng­gu­naan event bus. Mungkin, di ar­tikel se­lan­jut­nya akan diba­has im­ple­men­tasi event bus pada se­buah web ser­vice.