Protocol Definition
The communication between the YedMQ broker and its plugins is based on Protocol Buffers (Protobuf). All messages are wrapped in a generic ProtocolMessage envelope.
Source Fileβ
The authoritative definition of the plugin protocol can be found in the following file within the YedMQ source repository:
YedMQ/plugin_protocol/proto/yedmq_plugin_protocol.proto
Developers should use this file to generate Protobuf bindings in their preferred programming language.
Message Overviewβ
ProtocolMessageβ
Every IPC frame payload is a ProtocolMessage. It contains metadata and a variant field (using google.protobuf.Any) for the actual request, response, or notification data.
type: Indicates if the message is aREQUEST,RESPONSE,NOTIFICATION,EVENT, orERROR.method: For requests, specifies the action to be performed (e.g.,INITIALIZE,AUTHENTICATE).params: AnAnyfield containing the request parameters.result: AnAnyfield containing the successful response data.error: Contains error details if the operation failed.
Key Message Typesβ
| Message Type | Description |
|---|---|
InitializeRequest | Sent by the broker to start the handshake with a plugin. |
InitializeResponse | Sent by the plugin to provide its capabilities and hooks. |
AuthenticateRequest | Sent by the broker to request client authentication. |
AuthenticateResponse | Sent by the plugin with the authentication result. |
AuthorizeRequest | Sent by the broker to request authorization for a specific action (Connect, Publish, Subscribe). |
AuthorizeResponse | Sent by the plugin with the authorization result. |
MqttMessage | A structure representing an MQTT message, used in various hooks. |
ClientConnectedEvent | A notification sent when a client successfully connects. |
ClientDisconnectedEvent | A notification sent when a client disconnects. |
Full Proto Definitionβ
Below is the full content of yedmq_plugin_protocol.proto for quick reference:
syntax = "proto3";
package plugin_protocol;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
// Protocol message body
message ProtocolMessage {
string version = 1; // Protocol version, e.g. "1.0"
MessageType type = 2; // Message type
string id = 3; // Unique message identifier
google.protobuf.Timestamp timestamp = 4; // Timestamp
string source = 5; // Sender identifier
string target = 6; // Receiver identifier
// The following fields are used selectively based on type
optional Method method = 7; // Method type (used for requests)
google.protobuf.Any params = 8; // Parameters (used for requests/notifications)
google.protobuf.Any result = 9; // Result (used for responses)
optional ErrorInfo error = 10; // Error information (used for error responses)
// Extension fields
map<string, string> metadata = 15; // Metadata
}
// Error information
message ErrorInfo {
uint32 code = 1; // error code
string message = 2; // error message
google.protobuf.Struct details = 3; // additional error details
}
// Message type enum
enum MessageType {
MESSAGE_TYPE_UNSPECIFIED = 0;
MESSAGE_TYPE_REQUEST = 1;
MESSAGE_TYPE_RESPONSE = 2;
MESSAGE_TYPE_NOTIFICATION = 3;
MESSAGE_TYPE_EVENT = 4;
MESSAGE_TYPE_ERROR = 5;
MESSAGE_TYPE_BATCH_REQUEST = 6;
MESSAGE_TYPE_BATCH_RESPONSE = 7;
}
enum Method {
METHOD_UNSPECIFIED = 0;
// Lifecycle management (10-19)
METHOD_INITIALIZE = 10;
METHOD_SHUTDOWN = 11;
METHOD_PING = 12;
METHOD_REGISTER_PLUGIN = 13;
METHOD_UNREGISTER_PLUGIN = 14;
// Authentication and authorization (20-29)
METHOD_AUTHENTICATE = 20;
METHOD_AUTHORIZE = 21;
METHOD_GET_ACL = 23;
// MQTT message handling (30-39)
METHOD_ON_MESSAGE_PUBLISH = 30;
METHOD_ON_MESSAGE_SUBSCRIBE = 31;
METHOD_ON_MESSAGE_UNSUBSCRIBE = 32;
// Event notification (50-59)
METHOD_CLIENT_CONNECTED = 50;
METHOD_CLIENT_DISCONNECTED = 51;
METHOD_MESSAGE_PUBLISHED = 52;
METHOD_SUBSCRIPTION_ADDED = 53;
METHOD_SUBSCRIPTION_REMOVED = 54;
// Monitoring (60-69)
METHOD_GET_STATS = 60;
METHOD_RESET_STATS = 61;
}
// Plugin initialization
message InitializeRequest {
google.protobuf.Struct plugin_config = 1; // Plugin configuration
BrokerInfo broker_info = 2; // Broker information
repeated string required_capabilities = 3; // Required capabilities
}
message InitializeResponse {
string status = 1; // Status: "ready", "error"
repeated string capabilities = 2; // Capabilities provided by the plugin
repeated Hook hooks = 3; // Registered hooks
google.protobuf.Struct plugin_info = 4; // Plugin information
string auth_code = 5; // Authentication code for plugin registration
}
message BrokerInfo {
string version = 1;
int32 node_id = 2;
string cluster_name = 3;
google.protobuf.Struct properties = 4;
}
// Hook definition
message Hook {
string name = 1; // Hook name
uint32 priority = 2; // Priority (0-1000)
google.protobuf.Struct filter = 3; // Filter conditions
}
message AuthenticateRequest {
string client_id = 1;
string username = 2;
string password = 3;
string client_ip = 4;
bytes client_cert = 5; // Client certificate
string protocol_version = 6;
google.protobuf.Struct properties = 7; // MQTT 5.0 properties
}
message AuthenticateResponse {
bool authenticated = 1; // Authentication result
repeated string permissions = 2; // Permission list
google.protobuf.Struct session_data = 3; // Session data
optional string error_reason = 4; // Failure reason
optional string tenant_id = 5; // Tenant ID
}
message AuthorizeRequest {
string tenant_id = 1;
string client_id = 2;
string username = 3;
AuthAction action = 4; // Action type
string topic = 5; // Topic
uint32 qos = 6; // QoS level
google.protobuf.Struct context = 7; // Context information
}
message AuthorizeResponse {
bool authorized = 1;
optional string reason = 2; // Denial reason
google.protobuf.Struct modified_context = 3; // Modified context
}
enum AuthAction {
AUTH_ACTION_UNSPECIFIED = 0;
AUTH_ACTION_CONNECT = 1;
AUTH_ACTION_PUBLISH = 2;
AUTH_ACTION_SUBSCRIBE = 3;
AUTH_ACTION_UNSUBSCRIBE = 4;
}
message MqttMessage {
string tenant_id = 1;
string client_id = 2; // Client ID
string topic = 3; // Topic
bytes payload = 4; // Payload data
uint32 qos = 5;// QoS level
bool retain = 6; // Retain flag
bool dup = 7; // Duplicate flag
google.protobuf.Timestamp publish_time = 8; // Publish time
google.protobuf.Struct properties = 9; // MQTT 5.0 properties
optional string message_id = 10; // Message ID
}
// Message publish handling
message MessagePublishRequest {
MqttMessage message = 1;
google.protobuf.Struct context = 2; // Context information
}
message MessagePublishResponse {
bool allow = 1; // Whether publishing is allowed
optional MqttMessage modified_message = 2; // Modified message
optional string error_reason = 3; // Denial reason
optional bool continue_chain = 4;
}
// Subscription handling
message SubscribeRequest {
string client_id = 1;
repeated TopicFilter subscriptions = 2;
google.protobuf.Struct context = 3;
}
message SubscribeResponse {
repeated SubscribeResult results = 1;
optional bool continue_chain = 2;
}
message TopicFilter {
string topic = 1;
uint32 qos = 2;
google.protobuf.Struct options = 3; // Subscription options
}
message SubscribeResult {
string topic = 1;
bool allowed = 2;
uint32 granted_qos = 3;
optional string reason = 4;
}
// Client events
message ClientConnectedEvent {
string tenant_id = 1;
string client_id = 2;
string client_ip = 3;
string protocol_version = 4;
google.protobuf.Timestamp connect_time = 5;
google.protobuf.Struct client_info = 6;
}
message ClientDisconnectedEvent {
string tenant_id = 1;
string client_id = 2;
string reason = 3; // Disconnect reason
google.protobuf.Timestamp disconnect_time = 4;
google.protobuf.Struct session_info = 5;
}
// Statistics information
message StatsRequest {
repeated string metrics = 1; // Requested metrics
google.protobuf.Timestamp start_time = 2;
google.protobuf.Timestamp end_time = 3;
}
message StatsResponse {
map<string, MetricValue> metrics = 1;
google.protobuf.Timestamp timestamp = 2;
}
message MetricValue {
oneof value {
int64 int_value = 1;
double double_value = 2;
string string_value = 3;
google.protobuf.Struct struct_value = 4;
}
}
message Pong { }
message BatchRequest {
repeated ProtocolMessage messages = 1;
bool ordered = 2;
uint32 timeout_ms = 3;
}
message BatchResponse {
repeated ProtocolMessage responses = 1;
repeated ErrorInfo errors = 2;
google.protobuf.Struct stats = 3;
}
message PluginMetadata {
string name = 1;
string version = 2;
string description = 3;
string author = 4;
repeated string capabilities = 5;
google.protobuf.Struct config_schema = 6;
repeated Dependency dependencies = 7;
}
message Dependency {
string name = 1;
string version_constraint = 2;
bool optional = 3;
}