Sử dụng Headers Exchange trong RabbitMQ

Công Nghệ
Sử dụng Headers Exchange trong RabbitMQ
Bài viết được sự cho phép của tác giả Giang Phan Trong các bài viết trước, chúng ta đã cùng tìm hiểu về Direct Exchange , Fanout Exchange và Topic Exchange. Trong bài này, tôi sẽ giới thiệu với các bạn một loại exchange rất mạnh mẽ khác của RabbitM là Headers Exchange . Xem thêm Việc làm IT hấp dẫn trên Station D Sử dụng Alternate Exchange trong RabbitMQ Sử dụng binding Exchange to Exchange trong RabbitMQ Flow của một Message trong Headers Exchange Header exchange (amq.headers) được thiết kế để định tuyến với nhiều thuộc tính, để dàng thực hiện dưới dạng header của message hơn là routing key. Header exchange bỏ đi routing key mà thay vào đó định tuyến dựa trên header của message. Trường hợp này, broker cần một hoặc nhiều thông tin từ application developer, cụ thể là, nên quan tâm đến những tin nhắn với tiêu đề nào phù hợp hoặc tất cả chúng. Headers Exchange rất giống với Topic Exchange, nhưng nó định tuyến dựa trên các giá trị header thay vì routing key. Một Message được coi là phù hợp nếu giá trị của header bằng với giá trị được chỉ định khi ràng buộc. Flow của một Message trong Headers Exchange như sau: Một hoặc nhiều Queue được tạo và binding tới một Headers Exchange sử dụng các header property (H). Một Producer sẽ tạo một Message với các header property (MH) và publish tới Exchange . Một Message được Exchange chuyển đến Queue nếu Header H match với Header MH. Consumer đăng ký tới Queue để nhận Message. Có 2 loại matching được sử dụng để kiểm tra một Header của binding queue có match với một header từ message đến: any : tương tự như logic OR, được...

Bài viết được sự cho phép của tác giả Giang Phan 

Trong các bài viết trước, chúng ta đã cùng tìm hiểu về Direct ExchangeFanout Exchange và Topic Exchange. Trong bài này, tôi sẽ giới thiệu với các bạn một loại exchange rất mạnh mẽ khác của RabbitM là Headers Exchange.

Xem thêm Việc làm IT hấp dẫn trên Station D

Flow của một Message trong Headers Exchange

Header exchange (amq.headers) được thiết kế để định tuyến với nhiều thuộc tính, để dàng thực hiện dưới dạng header của message hơn là routing key. Header exchange bỏ đi routing key mà thay vào đó định tuyến dựa trên header của message. Trường hợp này, broker cần một hoặc nhiều thông tin từ application developer, cụ thể là, nên quan tâm đến những tin nhắn với tiêu đề nào phù hợp hoặc tất cả chúng.

Headers Exchange rất giống với Topic Exchange, nhưng nó định tuyến dựa trên các giá trị header thay vì routing key.

Một Message được coi là phù hợp nếu giá trị của header bằng với giá trị được chỉ định khi ràng buộc.

Flow của một Message trong Headers Exchange như sau:

  • Một hoặc nhiều Queue được tạo và binding tới một Headers Exchange sử dụng các header property (H).
  • Một Producer sẽ tạo một Message với các header property (MH) và publish tới Exchange.
  • Một Message được Exchange chuyển đến Queue nếu Header H match với Header MH.
  • Consumer đăng ký tới Queue để nhận Message.

Có 2 loại matching được sử dụng để kiểm tra một Header của binding queue có match với một header từ message đến:

  • any: tương tự như logic OR, được biểu diễn trong các ràng buộc header property là {“x-match“, “any“, …} . Nghĩa là, một Message được gửi tới Exchange phải chứa ít nhất một trong các header mà Queue được liên kết, sau đó Message sẽ được chuyển đến Queue.
  • all: tương tự như logic AND, được biểu diễn trong các ràng buộc header property là {“x-match“, “and“, …} . Nghĩa là, các Message có tất cả các header được liệt kê của nó sẽ được chuyển tiếp đến Queue.

Ví dụ Topic Exchange trong RabbitMQ

Trong ví dụ này, tôi tạo một Headers Exchange có tên GPCoderHeadersExchange, tạo 3 Queue binding tới Headers Exchange này:

  • QDeveloper : Queue này sẽ nhận tất cả message có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”}.
  • QManager : Queue này nhận tất cả message có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”} hoặc {“manager”, “Manager Channel”}.
  • QPublished : Queue này nhận tất cả message có header là {“dev”, “Developer Channel”} và {“access”, “publish”}.

Một số class của chương trình:

  • ConnectionManager : hỗ trợ tạo Connection đến RabbitMQ.
  • HeadersExchangeChannel :  class util hỗ trợ tạo Echange, Queue, binding Queue đến Exchange, publish/ subscribe message, …
  • Constant : định nghĩa constant chứa các thông tin về tên Exchange, Queue.
  • Producer: để gửi Message đến Exchange.
  • Consumer: để nhận Message từ Queue.
  • App: giả lập việc gửi nhận Message thông qua Headers Exchange của RabbitMQ.

ConnectionManager.java

package com.gpcoder.headersexchange;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionManager {

private ConnectionManager() {
super();
}

public static Connection createConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
return factory.newConnection();
}
}

TopicExchangeChannel.java

package com.gpcoder.headersexchange;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Map;

public class HeadersExchangeChannel {

    private String exchangeName;
    private Channel channel;
    private Connection connection;

    public HeadersExchangeChannel(Connection connection, String exchangeName) throws IOException {
        this.exchangeName = exchangeName;
        this.connection = connection;
        this.channel = connection.createChannel();
    }

    public void declareExchange() throws IOException {
        // exchangeDeclare( exchange, builtinExchangeType, durable)
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.HEADERS, true);
    }

    public void declareQueues(String ...queueNames) throws IOException {
        for (String queueName : queueNames) {
            // queueDeclare  - (queueName, durable, exclusive, autoDelete, arguments)
            channel.queueDeclare(queueName, true, false, false, null);
        }
    }

    public void performQueueBinding(String queueName, Map<String, Object> headers) throws IOException {
        // Create bindings - (queue, exchange, routingKey)
        channel.queueBind(queueName, exchangeName, "", headers);
    }

    public void subscribeMessage(String queueName) throws IOException {
        // basicConsume - ( queue, autoAck, deliverCallback, cancelCallback)
        channel.basicConsume(queueName, true, ((consumerTag, message) -> {
            System.out.println("[Received] [" + queueName + "]: " + consumerTag);
            System.out.println("[Received] [" + queueName + "]: " + new String(message.getBody()));
        }), consumerTag -> {
            System.out.println(consumerTag);
        });
    }

    public void publishMessage(String message, Map<String, Object> headers) throws IOException {
        BasicProperties properties = new BasicProperties()
                .builder().headers(headers).build();

        // basicPublish - ( exchange, routingKey, basicProperties, body)
        System.out.println("[Send] [" + headers + "]: " + message);
        channel.basicPublish(exchangeName, "", properties, message.getBytes());
    }
}

Constant.java

package com.gpcoder.headersexchange;

public final class Constant {

// Exchange

public static final String EXCHANGE_NAME = "GPCoderHeadersExchange";

// Queue

public static final String DEV_QUEUE_NAME = "QDeveloper";

public static final String MANAGER_QUEUE_NAME = "QManager";

public static final String PUBLISHED_QUEUE_NAME = "QPublished";

private Constant() {
super();
}
}

Producer.java

package com.gpcoder.headersexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.headersexchange.Constant.*;

public class Producer {

private HeadersExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new HeadersExchangeChannel(connection, EXCHANGE_NAME);

// Create headers exchange
channel.declareExchange();

// Create headers
Map<String, Object> devHeaders = new HashMap<>();
devHeaders.put("x-match", "any"); // Match any of the header
devHeaders.put("dev", "Developer Channel");
devHeaders.put("general", "General Channel");

Map<String, Object> managerHeaders = new HashMap<>();
managerHeaders.put("x-match", "any"); // Match any of the header
managerHeaders.put("dev", "Developer Channel");
managerHeaders.put("manager", "Manager Channel");
managerHeaders.put("general", "General Channel");

Map<String, Object> publishedHeaders = new HashMap<>();
publishedHeaders.put("x-match", "all"); // Match all of the header
publishedHeaders.put("general", "General Channel");
publishedHeaders.put("access", "publish");

// Create queues
channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, PUBLISHED_QUEUE_NAME);

// Binding queues with headers
channel.performQueueBinding(DEV_QUEUE_NAME, devHeaders);
channel.performQueueBinding(MANAGER_QUEUE_NAME, managerHeaders);
channel.performQueueBinding(PUBLISHED_QUEUE_NAME, publishedHeaders);
}

public void send(String message, Map<String, Object> headers) throws IOException {
// Send message
channel.publishMessage(message, headers);
}
}

Consumer.java

package com.gpcoder.headersexchange;

import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import static com.gpcoder.headersexchange.Constant.*;

public class Consumer {

private HeadersExchangeChannel channel;

public void start() throws IOException, TimeoutException {
// Create connection
Connection connection = ConnectionManager.createConnection();

// Create channel
channel = new HeadersExchangeChannel(connection, EXCHANGE_NAME);

// Create headers exchange
channel.declareExchange();

// Create headers
Map<String, Object> devHeaders = new HashMap<>();
devHeaders.put("x-match", "any"); // Match any of the header
devHeaders.put("dev", "Developer Channel");
devHeaders.put("general", "General Channel");

Map<String, Object> managerHeaders = new HashMap<>();
managerHeaders.put("x-match", "any"); // Match any of the header
managerHeaders.put("dev", "Developer Channel");
managerHeaders.put("manager", "Manager Channel");
managerHeaders.put("general", "General Channel");

Map<String, Object> publishedHeaders = new HashMap<>();
publishedHeaders.put("x-match", "all"); // Match all of the header
publishedHeaders.put("general", "General Channel");
publishedHeaders.put("access", "publish");

// Create queues
channel.declareQueues(DEV_QUEUE_NAME, MANAGER_QUEUE_NAME, PUBLISHED_QUEUE_NAME);

// Binding queues with headers
channel.performQueueBinding(DEV_QUEUE_NAME, devHeaders);
channel.performQueueBinding(MANAGER_QUEUE_NAME, managerHeaders);
channel.performQueueBinding(PUBLISHED_QUEUE_NAME, publishedHeaders);
}

public void subscribe() throws IOException {
// Subscribe message
channel.subscribeMessage(DEV_QUEUE_NAME);
channel.subscribeMessage(MANAGER_QUEUE_NAME);
channel.subscribeMessage(PUBLISHED_QUEUE_NAME);
}

}

App.java

package com.gpcoder.headersexchange;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

public class App {

public static void main(String[] args) throws IOException, TimeoutException {
// Create producers, queues and binding queues to Headers Exchange
Producer producer = new Producer();
producer.start();

// Publish some messages
Map<String, Object> devHeader = new HashMap<>();
devHeader.put("dev", "Developer Channel");
producer.send("[1] Developer message", devHeader);

Map<String, Object> managerHeader = new HashMap<>();
managerHeader.put("manager", "Manager Channel");
producer.send("[2] Manager message", managerHeader);

Map<String, Object> generalHeader = new HashMap<>();
generalHeader.put("general", "General Channel");
producer.send("[3] General message", generalHeader);

Map<String, Object> publishedHeader = new HashMap<>();
publishedHeader.put("general", "General Channel");
publishedHeader.put("access", "publish");
producer.send("[4] Published message", publishedHeader);

// Create consumers, queues and binding queues to Headers Exchange
Consumer consumer = new Consumer();
consumer.start();
consumer.subscribe();
}
}

Output của chương trình:

[Send] [{dev=Developer Channel}]: [1] Developer message
[Send] [{manager=Manager Channel}]: [2] Manager message
[Send] [{general=General Channel}]: [3] General message
[Send] [{general=General Channel, access=publish}]: [4] Published message
[Received] [QDeveloper]: amq.ctag-NEPntH2xJ4411lij29dCyg
[Received] [QDeveloper]: [1] Developer message
[Received] [QDeveloper]: amq.ctag-NEPntH2xJ4411lij29dCyg
[Received] [QDeveloper]: [3] General message
[Received] [QDeveloper]: amq.ctag-NEPntH2xJ4411lij29dCyg
[Received] [QDeveloper]: [4] Published message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [1] Developer message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [2] Manager message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [3] General message
[Received] [QManager]: amq.ctag-UBFWB3QQqjgbPNAdyx87NQ
[Received] [QManager]: [4] Published message
[Received] [QPublished]: amq.ctag-j0yil4i64E5lepsuKdKFdg
[Received] [QPublished]: [4] Published message

Như bạn thấy:

  • developer có thể nhận bất kỳ Message nào có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”}.
  • manager có thể nhận bất kỳ Message nào có header là {“dev”, “Developer Channel”} hoặc {“general”, “General Channel”} hoặc {“manager”, “Manager Channel”}.
  • customer có thể nhận bất kỳ Message nào có header là {“dev”, “Developer Channel”} và {“access”, “publish”}.

Tài liệu tham khảo:

Bài viết liên quan

Bộ cài đặt Laravel Installer đã hỗ trợ tích hợp Jetstream

Bộ cài đặt Laravel Installer đã hỗ trợ tích hợp Jetstream

Bài viết được sự cho phép của tác giả Chung Nguyễn Hôm nay, nhóm Laravel đã phát hành một phiên bản chính mới của “ laravel/installer ” bao gồm hỗ trợ khởi động nhanh các dự án Jetstream. Với phiên bản mới này khi bạn chạy laravel new project-name , bạn sẽ nhận được các tùy chọn Jetstream. Ví dụ: API Authentication trong Laravel-Vue SPA sử dụng Jwt-auth Cách sử dụng Laravel với Socket.IO laravel new foo --jet --dev Sau đó, nó sẽ hỏi bạn thích stack Jetstream nào hơn: Which Jetstream stack do you prefer? [0] Livewire [1] inertia > livewire Will your application use teams? (yes/no) [no]: ... Nếu bạn đã cài bộ Laravel Installer, để nâng cấp lên phiên bản mới bạn chạy lệnh: composer global update Một số trường hợp cập nhật bị thất bại, bạn hãy thử, gỡ đi và cài đặt lại nha composer global remove laravel/installer composer global require laravel/installer Bài viết gốc được đăng tải tại chungnguyen.xyz Có thể bạn quan tâm: Cài đặt Laravel Làm thế nào để chạy Sql Server Installation Center sau khi đã cài đặt xong Sql Server? Quản lý các Laravel route gọn hơn và dễ dàng hơn Xem thêm Tuyển dụng lập trình Laravel hấp dẫn trên Station D

By stationd
Principle thiết kế của các sản phẩm nổi tiếng

Principle thiết kế của các sản phẩm nổi tiếng

Tác giả: Lưu Bình An Phù hợp cho các bạn thiết kế nào ko muốn làm code dạo, design dạo nữa, bạn muốn cái gì đó cao hơn ở tầng khái niệm Nếu lập trình chúng ta có các nguyên tắc chung khi viết code như KISS , DRY , thì trong thiết kế cũng có những nguyên tắc chính khi làm việc. Những nguyên tắc này sẽ là kim chỉ nam, nếu có tranh cãi giữa các member trong team, thì cứ đè nguyên tắc này ra mà giải quyết (nghe hơi có mùi cứng nhắc, mình thì thích tùy cơ ứng biến hơn) Tìm các vị trí tuyển dụng designer lương cao cho bạn Nguyên tắc thiết kế của GOV.UK Đây là danh sách của trang GOV.UK Bắt đầu với thứ user cần Làm ít hơn Thiết kế với dữ liệu Làm mọi thứ thật dễ dàng Lặp. Rồi lặp lại lần nữa Dành cho tất cả mọi người Hiểu ngữ cảnh hiện tại Làm dịch vụ digital, không phải làm website Nhất quán, nhưng không hòa tan (phải có chất riêng với thằng khác) Cởi mở, mọi thứ tốt hơn Bao trừu tượng luôn các bạn, trang Gov.uk này cũng có câu tổng quát rất hay Thiết kế tốt là thiết kế có thể sử dụng. Phục vụ cho nhiều đối tượng sử dụng, dễ đọc nhất nhất có thể. Nếu phải từ bỏ đẹp tinh tế – thì cứ bỏ luôn . Chúng ta tạo sản phẩm cho nhu cầu sử dụng, không phải cho người hâm mộ . Chúng ta thiết kế để cả nước sử dụng, không phải những người đã từng sử dụng web. Những người cần dịch vụ của chúng ta nhất là những người đang cảm thấy khó sử dụng dịch...

By stationd
Hiểu về trình duyệt – How browsers work

Hiểu về trình duyệt – How browsers work

Bài viết được sự cho phép của vntesters.com Khi nhìn từ bên ngoài, trình duyệt web giống như một ứng dụng hiển thị những thông tin và tài nguyên từ server lên màn hình người sử dụng, nhưng để làm được công việc hiển thị đó đòi hỏi trình duyệt phải xử lý rất nhiều thông tin và nhiều tầng phía bên dưới. Việc chúng ta (Developers, Testers) tìm hiểu càng sâu tầng bên dưới để nắm được nguyên tắc hoạt động và xử lý của trình duyệt sẽ rất hữu ích trong công việc viết code, sử dụng các tài nguyên cũng như kiểm thử ứng dụng của mình. Cách để npm packages chạy trong browser Câu hỏi phỏng vấn mẹo về React: Component hay element được render trong browser? Khi hiểu được cách thức hoạt động của trình duyệt chúng ta có thể trả lời được rất nhiều câu hỏi như: Tại sao cùng một trang web lại hiển thị khác nhau trên hai trình duyệt? Tại sao chức năng này đang chạy tốt trên trình duyệt Firefox nhưng qua trình duyệt khác lại bị lỗi? Làm sao để trang web hiển thị nội dung nhanh và tối ưu hơn một chút?… Hy vọng sau bài này sẽ giúp các bạn có một cái nhìn rõ hơn cũng như giúp ích được trong công việc hiện tại. 1. Cấu trúc của một trình duyệt Trước tiên chúng ta đi qua cấu trúc, thành phần chung và cơ bản nhất của một trình duyệt web hiện đại, nó sẽ gồm các thành phần (tầng) như sau: Thành phần nằm phía trên là những thành phần gần với tương tác của người dùng, càng phía dưới thì càng sâu và nặng về xử lý dữ liệu hơn tương tác. Nhiệm...

By stationd
Thị trường EdTech Vietnam- Nhiều tiềm năng nhưng còn bị bỏ ngỏ tại Việt Nam

Thị trường EdTech Vietnam- Nhiều tiềm năng nhưng còn bị bỏ ngỏ tại Việt Nam

Lĩnh vực EdTech (ứng dụng công nghệ vào các sản phẩm giáo dục) trên toàn cầu hiện nay đã tương đối phong phú với nhiều tên tuổi lớn phân phối đều trên các hạng mục như Broad Online Learning Platforms (nền tảng cung cấp khóa học online đại chúng – tiêu biểu như Coursera, Udemy, KhanAcademy,…) Learning Management Systems (hệ thống quản lý lớp học – tiêu biểu như Schoology, Edmodo, ClassDojo,…) Next-Gen Study Tools (công cụ hỗ trợ học tập – tiểu biểu như Kahoot!, Lumosity, Curriculet,…) Tech Learning (đào tạo công nghệ – tiêu biểu như Udacity, Codecademy, PluralSight,…), Enterprise Learning (đào tạo trong doanh nghiệp – tiêu biểu như Edcast, ExecOnline, Grovo,..),… Hiện nay thị trường EdTech tại Việt Nam đã đón nhận khoảng đầu tư khoảng 55 triệu đô cho lĩnh vực này nhiều đơn vị nước ngoài đang quan tâm mạnh đến thị trường này ngày càng nhiều hơn. Là một trong những xu hướng phát triển tốt, và có doanh nghiệp đã hoạt động khá lâu trong ngành nêu tại infographic như Topica, nhưng EdTech vẫn chỉ đang trong giai đoạn sơ khai tại Việt Nam. Tại Việt Nam, hệ sinh thái EdTech trong nước vẫn còn rất non trẻ và thiếu vắng nhiều tên tuổi trong các hạng mục như Enterprise Learning (mới chỉ có MANA), School Administration (hệ thống quản lý trường học) hay Search (tìm kiếm, so sánh trường và khóa học),… Với chỉ dưới 5% số dân công sở có sử dụng một trong các dịch vụ giáo dục online, EdTech cho thấy vẫn còn một thị trường rộng lớn đang chờ được khai phá. *** Vừa qua Station D đã công bố Báo cáo Vietnam IT Landscape 2019 đem đến cái nhìn toàn cảnh về các ứng dụng công...

By stationd