Chuyện mấy con Consumer

Công Nghệ
Chuyện mấy con Consumer
Bài viết được sự cho phép của tác giả Nguyễn Hữu Đồng Vào một ngày đẹp trời, đang ngồi code lan man mình nhận được thông báo từ xếp, một con service tracking của một project cũ đang gây nghẽn database, chuyện là cty đó đang ăn nên làm ra traffic vô site tăng gấp 15 lần, user tương tác nhiều dẫn đến các con service khác cũng tải nhiều hơn, db thì càng ngày càng bị hấp diêm nhiều hơn, ko chỉ đến từ còn service tracking mà còn đến từ nhiều con khác. Tracking thì ngày trước mình code theo mô hình đơn giản, Client gọi API, đẩy message vào 1 cái queue, có tầm 100 con consumer ngồi hốt message ra process rồi ghi vô database. Cloud-Native Microservices Với TIBCO: Khám phá dịch vụ bằng cách sử dụng Consul 26 công cụ và kỹ thuật trong Big Data có thể bạn chưa biết Nhưng đến nước này thì chỉ có thể giảm tải cho database bằng cách giảm số lượng con consumer xuống, hạn chế tải đồng thời cho database. Trước kia code vô nhân đạo nên mình ko handle chuyện cần restart lại App thì mới update config. Tranh thủ lúc đang nhàn rỗi, mấy dự án đang ngồi chờ chốt requirement nên tranh thủ update sửa cho em nó. Nhu cầu cần thiết lúc này là Chỉ cần update file config là app tự động update số consumer Khi deploy new code thì phải graceful shutdown, chờ mỗi consumer xử lí xong message và cho em nó nghỉ ngơi. Viper có sẵn tính năng watch config file nên mình sẽ dùng luôn func watchConsumerCount() { viper.WatchConfig() viper.OnConfigChange(func(e fsnotify.Event) { fmt.Println("Config file changed:", e.Name) n := viper.GetInt("consumer_count") if n != consumer_count { fmt.Println("resize consumer count to...

Bài viết được sự cho phép của tác giả Nguyễn Hữu Đồng

Vào một ngày đẹp trời, đang ngồi code lan man mình nhận được thông báo từ xếp, một con service tracking của một project cũ đang gây nghẽn database, chuyện là cty đó đang ăn nên làm ra traffic vô site tăng gấp 15 lần, user tương tác nhiều dẫn đến các con service khác cũng tải nhiều hơn, db thì càng ngày càng bị hấp diêm nhiều hơn, ko chỉ đến từ còn service tracking mà còn đến từ nhiều con khác.

Tracking thì ngày trước mình code theo mô hình đơn giản, Client gọi API, đẩy message vào 1 cái queue, có tầm 100 con consumer ngồi hốt message ra process rồi ghi vô database.

Nhưng đến nước này thì chỉ có thể giảm tải cho database bằng cách giảm số lượng con consumer xuống, hạn chế tải đồng thời cho database. Trước kia code vô nhân đạo nên mình ko handle chuyện cần restart lại App thì mới update config. Tranh thủ lúc đang nhàn rỗi, mấy dự án đang ngồi chờ chốt requirement nên tranh thủ update sửa cho em nó.

Nhu cầu cần thiết lúc này là

  • Chỉ cần update file config là app tự động update số consumer
  • Khi deploy new code thì phải graceful shutdown, chờ mỗi consumer xử lí xong message và cho em nó nghỉ ngơi.

Viper có sẵn tính năng watch config file nên mình sẽ dùng luôn

func watchConsumerCount() {
  viper.WatchConfig()
  viper.OnConfigChange(func(e fsnotify.Event) {
    fmt.Println("Config file changed:", e.Name)
    n := viper.GetInt("consumer_count")
    if n != consumer_count {
      fmt.Println("resize consumer count to => ", n)
      go ResizeConsumerCount(n)
    }
  })
}

Khi config có sự thay đổi thì nếu là thay đổi giá trị số consumer thì mình tiến hành chạy function ResizeConsumerCount để update lại số lượng consumer

Để quản lí mỗi consumer sẽ thì mình có cái slice mỗi phần từ có type như sau

type ConsumerRoutine struct {
  Id       int         // ID của consumer
  Exit     chan bool   // Channel để nhận lệnh exit
  Channel  *amqp.Channel // AMQP channel  
  Delivery <-chan amqp.Delivery // AMQP delivery chan,để nhận message từ queue
  Done     chan bool    //  Channel báo cáo lên thằng quản lí tao xong hết rồi, chuẩn bị thoát
}

Mỗi consumer khi chạy sẽ ôm cái biến này để giao tiếp với thằng quản lí.

Khởi tạo consumer và các thần phần cần thiết khác.

var amql_conn *amqp.Connection
var amqp_url string
var consumer []*ConsumerRoutine
const max_count = 100
var consumer_count int
var queue_name string
​
func loadConf() {
  viper.SetConfigName("config") 
  viper.SetConfigType("yaml")  
  viper.AddConfigPath(".")      
  err := viper.ReadInConfig()   
  if err != nil {             
    panic(fmt.Errorf("Fatal error config file: %s n", err))
  }
  amqp_url = viper.GetString("amqp_url")
  consumer_count = viper.GetInt("consumer_count")
  queue_name = viper.GetString("queue_name")
}
​
func initAMQPConn() (err error) {
  amql_conn, err = amqp.Dial(amqp_url)
  if err != nil {
    panic(err)
  }
  return err
}
​
func initConsumerTask() {
  consumer = make([]*ConsumerRoutine, max_count)
}
​
func init() {
  loadConf()
  initAMQPConn()
  initConsumerTask()
  go watchConsumerCount()
}

Bắt đầu tạo consumer, mỗi consumer sẽ được chạy trong 1 goroutine và update tình trạng vào cái biến mà nó đươc giao

func startNewConsumer(m *ConsumerRoutine) {
  m.Exit = make(chan bool)
  m.Done = make(chan bool)
  m.Channel, _ = amql_conn.Channel()
  m.Channel.QueueDeclare(queue_name, true, false, false, false, nil)
  m.Delivery, _ = m.Channel.Consume(queue_name, "", false, false, false, false, nil)
  fmt.Println("Consumer : ", m.Id, "started")
  for {
    select {
    case <-m.Exit:       // Lắng nghe lệnh exit từ bên ngoài
      m.Channel.Close()  // Đóng amqp channel
      m.Channel = nil    
      m.Done <- true     // Báo cáo là sẵn sàng thoát
      fmt.Println("Consumer ", m.Id, " ended")
      return
    case msg := <-m.Delivery: // Chờ message từ queue
      // Xử lí message ở đây
      fmt.Println("Consumer ", m.Id, "recived message = ", string(msg.Body))
      // Ack hoặc Reject message, ở đây để luôn có data test nên mình reject luôn :pikatroll:
      msg.Reject(true)
    }
  }
}

Mỗi khi chạy một con consumer mới thì nó sẽ lắng nghe lệnh exit từ bên ngoài thông qua channel m.Exit , nhận message từ queue qua channel m.Delivery . Có message thì sẽ xử lí khi nào xong thì nghe tiếp.

Đến lượt function thay đổi số lượng consumer

func ResizeConsumerCount(n int) {
  // scale up
  // Chỗ này thì thêm phần từ vô slice và sau đó thì chạy một con
  // consumer và truyền biến vào để giao tiếp
  if n >= consumer_count {
    val := consumer_count
    consumer_count = n
    for i := val; i < n; i++ {
      consumer[i] = &ConsumerRoutine{
        Id:     i,
        Exit:   make(chan bool),
        Done:   make(chan bool),
      }
      go startNewConsumer(consumer[i])
    }
    return
  }
  // scale down
  // Giả sự số lượng hiện tại là consumer_count
  // và số consumer sau khi giảm là n thì mình sẽ 
  // gửi lệnh exit cho những con consumer từ n->val-1
  // vì slice được đếm từ 0
  val := consumer_count
  consumer_count = n
  for i := n; i < val; i++ {
    fmt.Println("shuting down consumer : ", i)
    go func(m *ConsumerRoutine) {
      fmt.Println("Exit Chan Pointer = ", &m.Exit)
      m.Exit <- true
    }(consumer[i])
    <-consumer[i].Done
  }
}

Và function main nơi mà mọi thứ bắt đầu.

func main() {
  // Mình chạy consumer_count con consumer từ file config
  for i := 0; i < consumer_count; i++ {
    consumer[i] = &ConsumerRoutine{
      Id: i,
    }
    go startNewConsumer(consumer[i])
  }
  // khởi tạo 1 channel để lắng nghe signal 
  stop := make(chan os.Signal, 1)
  signal.Notify(stop)
  for {
    v := <-stop
    fmt.Println("SIGNAL : ", v.String())
    // nếu signal là hangup thì mình sẽ tắt hết consumer và thoát app
    if v.String() == "hangup" {
      ResizeConsumerCount(0)
      os.Exit(0)
    }
  }
}

Dưới dây là file config.

amqp_url: "amqp://admin:admin@172.31.39.18:5672"
queue_name: "queue:demo"
consumer_count: 10

Build và chạy demo nào

➜  resizeConsumer go build && ./rc
Consumer :  0 started Exit pointer val =  0xc00019a018
Consumer :  1 started Exit pointer val =  0xc00019a058
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition

Sửa config thì 4 con consumer

amqp_url: "amqp://admin:admin@172.31.39.18:5672"
queue_name: "queue:demo"
consumer_count: 4

Thấy update trong console

SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
Config file changed: /Users/vimftw/Desktop/code/resizeConsumer/config.yaml
resize consumer count to =>  4
SIGNAL :  urgent I/O condition
Consumer :  2 started Exit pointer val =  0xc00005a718
Consumer :  3 started Exit pointer val =  0xc00005a758
SIGNAL :  urgent I/O condition

Scale down xuống 1

amqp_url: "amqp://admin:admin@172.31.39.18:5672"
queue_name: "queue:demo"
consumer_count: 1

Output

Config file changed: /Users/vimftw/Desktop/code/resizeConsumer/config.yaml
resize consumer count to =>  1
shuting down consumer :  1
Exit Chan Pointer =  0xc00019a058
have to exit now
Consumer  1  ended
shuting down consumer :  2
Exit Chan Pointer =  0xc00005a718
have to exit now
Consumer  2  ended
shuting down consumer :  3
Exit Chan Pointer =  0xc00005a758
have to exit now
Consumer  3  ended
SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition

Thử tắt luôn con app

SIGNAL :  urgent I/O condition
SIGNAL :  urgent I/O condition
SIGNAL :  hangup
shuting down consumer :  0
Exit Chan Pointer =  0xc00019a018
have to exit now
Consumer  0  ended

Trước mắt thì có vẻ nó hoạt động ổn, mình sẽ test cẩn thận trước khi cho lên production. Lâu quá không viết bài nên mình tranh thủ luôn.
File code mình để ở đây,các bạn thấy hay thì start cho mình với nha https://gist.github.com/dongnguyenltqb/5a4ed0e2dc7d156e3fe6d8dfd064faff

Cảm ơn các bạn đã ghé và đọc bài viết của mình, hihi

Bài viết gốc được đăng tải tại dongnguyenltqb.medium.com

Có thể bạn quan tâm:

Xem thêm Việc làm Developer hấp dẫn trên Station D

Bài viết liên quan

Bộ cài đặt Laravel Installer đã hỗ trợ tích hợp Jetstream

Bộ cài đặt Laravel Installer đã hỗ trợ tích hợp Jetstream

Bài viết được sự cho phép của tác giả Chung Nguyễn Hôm nay, nhóm Laravel đã phát hành một phiên bản chính mới của “ laravel/installer ” bao gồm hỗ trợ khởi động nhanh các dự án Jetstream. Với phiên bản mới này khi bạn chạy laravel new project-name , bạn sẽ nhận được các tùy chọn Jetstream. Ví dụ: API Authentication trong Laravel-Vue SPA sử dụng Jwt-auth Cách sử dụng Laravel với Socket.IO laravel new foo --jet --dev Sau đó, nó sẽ hỏi bạn thích stack Jetstream nào hơn: Which Jetstream stack do you prefer? [0] Livewire [1] inertia > livewire Will your application use teams? (yes/no) [no]: ... Nếu bạn đã cài bộ Laravel Installer, để nâng cấp lên phiên bản mới bạn chạy lệnh: composer global update Một số trường hợp cập nhật bị thất bại, bạn hãy thử, gỡ đi và cài đặt lại nha composer global remove laravel/installer composer global require laravel/installer Bài viết gốc được đăng tải tại chungnguyen.xyz Có thể bạn quan tâm: Cài đặt Laravel Làm thế nào để chạy Sql Server Installation Center sau khi đã cài đặt xong Sql Server? Quản lý các Laravel route gọn hơn và dễ dàng hơn Xem thêm Tuyển dụng lập trình Laravel hấp dẫn trên Station D

By stationd
Principle thiết kế của các sản phẩm nổi tiếng

Principle thiết kế của các sản phẩm nổi tiếng

Tác giả: Lưu Bình An Phù hợp cho các bạn thiết kế nào ko muốn làm code dạo, design dạo nữa, bạn muốn cái gì đó cao hơn ở tầng khái niệm Nếu lập trình chúng ta có các nguyên tắc chung khi viết code như KISS , DRY , thì trong thiết kế cũng có những nguyên tắc chính khi làm việc. Những nguyên tắc này sẽ là kim chỉ nam, nếu có tranh cãi giữa các member trong team, thì cứ đè nguyên tắc này ra mà giải quyết (nghe hơi có mùi cứng nhắc, mình thì thích tùy cơ ứng biến hơn) Tìm các vị trí tuyển dụng designer lương cao cho bạn Nguyên tắc thiết kế của GOV.UK Đây là danh sách của trang GOV.UK Bắt đầu với thứ user cần Làm ít hơn Thiết kế với dữ liệu Làm mọi thứ thật dễ dàng Lặp. Rồi lặp lại lần nữa Dành cho tất cả mọi người Hiểu ngữ cảnh hiện tại Làm dịch vụ digital, không phải làm website Nhất quán, nhưng không hòa tan (phải có chất riêng với thằng khác) Cởi mở, mọi thứ tốt hơn Bao trừu tượng luôn các bạn, trang Gov.uk này cũng có câu tổng quát rất hay Thiết kế tốt là thiết kế có thể sử dụng. Phục vụ cho nhiều đối tượng sử dụng, dễ đọc nhất nhất có thể. Nếu phải từ bỏ đẹp tinh tế – thì cứ bỏ luôn . Chúng ta tạo sản phẩm cho nhu cầu sử dụng, không phải cho người hâm mộ . Chúng ta thiết kế để cả nước sử dụng, không phải những người đã từng sử dụng web. Những người cần dịch vụ của chúng ta nhất là những người đang cảm thấy khó sử dụng dịch...

By stationd
Hiểu về trình duyệt – How browsers work

Hiểu về trình duyệt – How browsers work

Bài viết được sự cho phép của vntesters.com Khi nhìn từ bên ngoài, trình duyệt web giống như một ứng dụng hiển thị những thông tin và tài nguyên từ server lên màn hình người sử dụng, nhưng để làm được công việc hiển thị đó đòi hỏi trình duyệt phải xử lý rất nhiều thông tin và nhiều tầng phía bên dưới. Việc chúng ta (Developers, Testers) tìm hiểu càng sâu tầng bên dưới để nắm được nguyên tắc hoạt động và xử lý của trình duyệt sẽ rất hữu ích trong công việc viết code, sử dụng các tài nguyên cũng như kiểm thử ứng dụng của mình. Cách để npm packages chạy trong browser Câu hỏi phỏng vấn mẹo về React: Component hay element được render trong browser? Khi hiểu được cách thức hoạt động của trình duyệt chúng ta có thể trả lời được rất nhiều câu hỏi như: Tại sao cùng một trang web lại hiển thị khác nhau trên hai trình duyệt? Tại sao chức năng này đang chạy tốt trên trình duyệt Firefox nhưng qua trình duyệt khác lại bị lỗi? Làm sao để trang web hiển thị nội dung nhanh và tối ưu hơn một chút?… Hy vọng sau bài này sẽ giúp các bạn có một cái nhìn rõ hơn cũng như giúp ích được trong công việc hiện tại. 1. Cấu trúc của một trình duyệt Trước tiên chúng ta đi qua cấu trúc, thành phần chung và cơ bản nhất của một trình duyệt web hiện đại, nó sẽ gồm các thành phần (tầng) như sau: Thành phần nằm phía trên là những thành phần gần với tương tác của người dùng, càng phía dưới thì càng sâu và nặng về xử lý dữ liệu hơn tương tác. Nhiệm...

By stationd
Thị trường EdTech Vietnam- Nhiều tiềm năng nhưng còn bị bỏ ngỏ tại Việt Nam

Thị trường EdTech Vietnam- Nhiều tiềm năng nhưng còn bị bỏ ngỏ tại Việt Nam

Lĩnh vực EdTech (ứng dụng công nghệ vào các sản phẩm giáo dục) trên toàn cầu hiện nay đã tương đối phong phú với nhiều tên tuổi lớn phân phối đều trên các hạng mục như Broad Online Learning Platforms (nền tảng cung cấp khóa học online đại chúng – tiêu biểu như Coursera, Udemy, KhanAcademy,…) Learning Management Systems (hệ thống quản lý lớp học – tiêu biểu như Schoology, Edmodo, ClassDojo,…) Next-Gen Study Tools (công cụ hỗ trợ học tập – tiểu biểu như Kahoot!, Lumosity, Curriculet,…) Tech Learning (đào tạo công nghệ – tiêu biểu như Udacity, Codecademy, PluralSight,…), Enterprise Learning (đào tạo trong doanh nghiệp – tiêu biểu như Edcast, ExecOnline, Grovo,..),… Hiện nay thị trường EdTech tại Việt Nam đã đón nhận khoảng đầu tư khoảng 55 triệu đô cho lĩnh vực này nhiều đơn vị nước ngoài đang quan tâm mạnh đến thị trường này ngày càng nhiều hơn. Là một trong những xu hướng phát triển tốt, và có doanh nghiệp đã hoạt động khá lâu trong ngành nêu tại infographic như Topica, nhưng EdTech vẫn chỉ đang trong giai đoạn sơ khai tại Việt Nam. Tại Việt Nam, hệ sinh thái EdTech trong nước vẫn còn rất non trẻ và thiếu vắng nhiều tên tuổi trong các hạng mục như Enterprise Learning (mới chỉ có MANA), School Administration (hệ thống quản lý trường học) hay Search (tìm kiếm, so sánh trường và khóa học),… Với chỉ dưới 5% số dân công sở có sử dụng một trong các dịch vụ giáo dục online, EdTech cho thấy vẫn còn một thị trường rộng lớn đang chờ được khai phá. *** Vừa qua Station D đã công bố Báo cáo Vietnam IT Landscape 2019 đem đến cái nhìn toàn cảnh về các ứng dụng công...

By stationd