Skip to main content

Protocol Definition

The communication between the YedMQ broker and its plugins is based on Protocol Buffers (Protobuf). All messages are wrapped in a generic ProtocolMessage envelope.

Source File​

The authoritative definition of the plugin protocol can be found in the following file within the YedMQ source repository:

YedMQ/plugin_protocol/proto/yedmq_plugin_protocol.proto

Developers should use this file to generate Protobuf bindings in their preferred programming language.

Message Overview​

ProtocolMessage​

Every IPC frame payload is a ProtocolMessage. It contains metadata and a variant field (using google.protobuf.Any) for the actual request, response, or notification data.

  • type: Indicates if the message is a REQUEST, RESPONSE, NOTIFICATION, EVENT, or ERROR.
  • method: For requests, specifies the action to be performed (e.g., INITIALIZE, AUTHENTICATE).
  • params: An Any field containing the request parameters.
  • result: An Any field containing the successful response data.
  • error: Contains error details if the operation failed.

Key Message Types​

Message TypeDescription
InitializeRequestSent by the broker to start the handshake with a plugin.
InitializeResponseSent by the plugin to provide its capabilities and hooks.
AuthenticateRequestSent by the broker to request client authentication.
AuthenticateResponseSent by the plugin with the authentication result.
AuthorizeRequestSent by the broker to request authorization for a specific action (Connect, Publish, Subscribe).
AuthorizeResponseSent by the plugin with the authorization result.
MqttMessageA structure representing an MQTT message, used in various hooks.
ClientConnectedEventA notification sent when a client successfully connects.
ClientDisconnectedEventA notification sent when a client disconnects.

Full Proto Definition​

Below is the full content of yedmq_plugin_protocol.proto for quick reference:

syntax = "proto3";

package plugin_protocol;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

// Protocol message body
message ProtocolMessage {
string version = 1; // Protocol version, e.g. "1.0"
MessageType type = 2; // Message type
string id = 3; // Unique message identifier
google.protobuf.Timestamp timestamp = 4; // Timestamp
string source = 5; // Sender identifier
string target = 6; // Receiver identifier

// The following fields are used selectively based on type
optional Method method = 7; // Method type (used for requests)
google.protobuf.Any params = 8; // Parameters (used for requests/notifications)
google.protobuf.Any result = 9; // Result (used for responses)
optional ErrorInfo error = 10; // Error information (used for error responses)

// Extension fields
map<string, string> metadata = 15; // Metadata
}

// Error information
message ErrorInfo {
uint32 code = 1; // error code
string message = 2; // error message
google.protobuf.Struct details = 3; // additional error details
}


// Message type enum
enum MessageType {
MESSAGE_TYPE_UNSPECIFIED = 0;
MESSAGE_TYPE_REQUEST = 1;
MESSAGE_TYPE_RESPONSE = 2;
MESSAGE_TYPE_NOTIFICATION = 3;
MESSAGE_TYPE_EVENT = 4;
MESSAGE_TYPE_ERROR = 5;
MESSAGE_TYPE_BATCH_REQUEST = 6;
MESSAGE_TYPE_BATCH_RESPONSE = 7;
}

enum Method {
METHOD_UNSPECIFIED = 0;

// Lifecycle management (10-19)
METHOD_INITIALIZE = 10;
METHOD_SHUTDOWN = 11;
METHOD_PING = 12;
METHOD_REGISTER_PLUGIN = 13;
METHOD_UNREGISTER_PLUGIN = 14;

// Authentication and authorization (20-29)
METHOD_AUTHENTICATE = 20;
METHOD_AUTHORIZE = 21;
METHOD_GET_ACL = 23;

// MQTT message handling (30-39)
METHOD_ON_MESSAGE_PUBLISH = 30;
METHOD_ON_MESSAGE_SUBSCRIBE = 31;
METHOD_ON_MESSAGE_UNSUBSCRIBE = 32;

// Event notification (50-59)
METHOD_CLIENT_CONNECTED = 50;
METHOD_CLIENT_DISCONNECTED = 51;
METHOD_MESSAGE_PUBLISHED = 52;
METHOD_SUBSCRIPTION_ADDED = 53;
METHOD_SUBSCRIPTION_REMOVED = 54;

// Monitoring (60-69)
METHOD_GET_STATS = 60;
METHOD_RESET_STATS = 61;
}

// Plugin initialization
message InitializeRequest {
google.protobuf.Struct plugin_config = 1; // Plugin configuration
BrokerInfo broker_info = 2; // Broker information
repeated string required_capabilities = 3; // Required capabilities
}


message InitializeResponse {
string status = 1; // Status: "ready", "error"
repeated string capabilities = 2; // Capabilities provided by the plugin
repeated Hook hooks = 3; // Registered hooks
google.protobuf.Struct plugin_info = 4; // Plugin information
string auth_code = 5; // Authentication code for plugin registration
}

message BrokerInfo {
string version = 1;
int32 node_id = 2;
string cluster_name = 3;
google.protobuf.Struct properties = 4;
}

// Hook definition
message Hook {
string name = 1; // Hook name
uint32 priority = 2; // Priority (0-1000)
google.protobuf.Struct filter = 3; // Filter conditions
}

message AuthenticateRequest {
string client_id = 1;
string username = 2;
string password = 3;
string client_ip = 4;
bytes client_cert = 5; // Client certificate
string protocol_version = 6;
google.protobuf.Struct properties = 7; // MQTT 5.0 properties
}

message AuthenticateResponse {
bool authenticated = 1; // Authentication result
repeated string permissions = 2; // Permission list
google.protobuf.Struct session_data = 3; // Session data
optional string error_reason = 4; // Failure reason
optional string tenant_id = 5; // Tenant ID
}

message AuthorizeRequest {
string tenant_id = 1;
string client_id = 2;
string username = 3;
AuthAction action = 4; // Action type
string topic = 5; // Topic
uint32 qos = 6; // QoS level
google.protobuf.Struct context = 7; // Context information
}

message AuthorizeResponse {
bool authorized = 1;
optional string reason = 2; // Denial reason
google.protobuf.Struct modified_context = 3; // Modified context
}

enum AuthAction {
AUTH_ACTION_UNSPECIFIED = 0;
AUTH_ACTION_CONNECT = 1;
AUTH_ACTION_PUBLISH = 2;
AUTH_ACTION_SUBSCRIBE = 3;
AUTH_ACTION_UNSUBSCRIBE = 4;
}

message MqttMessage {
string tenant_id = 1;
string client_id = 2; // Client ID
string topic = 3; // Topic
bytes payload = 4; // Payload data
uint32 qos = 5;// QoS level
bool retain = 6; // Retain flag
bool dup = 7; // Duplicate flag
google.protobuf.Timestamp publish_time = 8; // Publish time
google.protobuf.Struct properties = 9; // MQTT 5.0 properties
optional string message_id = 10; // Message ID
}

// Message publish handling
message MessagePublishRequest {
MqttMessage message = 1;
google.protobuf.Struct context = 2; // Context information
}

message MessagePublishResponse {
bool allow = 1; // Whether publishing is allowed
optional MqttMessage modified_message = 2; // Modified message
optional string error_reason = 3; // Denial reason
optional bool continue_chain = 4;
}

// Subscription handling
message SubscribeRequest {
string client_id = 1;
repeated TopicFilter subscriptions = 2;
google.protobuf.Struct context = 3;
}

message SubscribeResponse {
repeated SubscribeResult results = 1;
optional bool continue_chain = 2;
}

message TopicFilter {
string topic = 1;
uint32 qos = 2;
google.protobuf.Struct options = 3; // Subscription options
}

message SubscribeResult {
string topic = 1;
bool allowed = 2;
uint32 granted_qos = 3;
optional string reason = 4;
}

// Client events
message ClientConnectedEvent {
string tenant_id = 1;
string client_id = 2;
string client_ip = 3;
string protocol_version = 4;
google.protobuf.Timestamp connect_time = 5;
google.protobuf.Struct client_info = 6;
}
message ClientDisconnectedEvent {
string tenant_id = 1;
string client_id = 2;
string reason = 3; // Disconnect reason
google.protobuf.Timestamp disconnect_time = 4;
google.protobuf.Struct session_info = 5;
}

// Statistics information
message StatsRequest {
repeated string metrics = 1; // Requested metrics
google.protobuf.Timestamp start_time = 2;
google.protobuf.Timestamp end_time = 3;
}

message StatsResponse {
map<string, MetricValue> metrics = 1;
google.protobuf.Timestamp timestamp = 2;
}

message MetricValue {
oneof value {
int64 int_value = 1;
double double_value = 2;
string string_value = 3;
google.protobuf.Struct struct_value = 4;
}
}

message Pong { }

message BatchRequest {
repeated ProtocolMessage messages = 1;
bool ordered = 2;
uint32 timeout_ms = 3;
}

message BatchResponse {
repeated ProtocolMessage responses = 1;
repeated ErrorInfo errors = 2;
google.protobuf.Struct stats = 3;
}

message PluginMetadata {
string name = 1;
string version = 2;
string description = 3;
string author = 4;
repeated string capabilities = 5;
google.protobuf.Struct config_schema = 6;
repeated Dependency dependencies = 7;
}

message Dependency {
string name = 1;
string version_constraint = 2;
bool optional = 3;
}