跳到主要内容

协议定义

YedMQ 代理与其插件之间的通信基于 Protocol Buffers (Protobuf)。所有消息都包装在一个通用的 ProtocolMessage 信封中。

源文件

插件协议的权威定义可以在 YedMQ 源码库的以下文件中找到:

YedMQ/plugin_protocol/proto/yedmq_plugin_protocol.proto

开发者应使用此文件在他们喜欢的编程语言中生成 Protobuf 绑定。

消息概览

ProtocolMessage

每个 IPC 帧的有效载荷都是一个 ProtocolMessage。它包含元数据和一个变体字段(使用 google.protobuf.Any),用于存储实际的请求、响应或通知数据。

  • type: 指示消息是 REQUEST(请求)、RESPONSE(响应)、NOTIFICATION(通知)、EVENT(事件)还是 ERROR(错误)。
  • method: 对于请求,指定要执行的操作(例如 INITIALIZE, AUTHENTICATE)。
  • params: 一个 Any 字段,包含请求参数。
  • result: 一个 Any 字段,包含成功的响应数据。
  • error: 如果操作失败,则包含错误详情。

关键消息类型

消息类型描述
InitializeRequest由代理发送,用于启动与插件的握手。
InitializeResponse由插件发送,提供其能力(capabilities)和钩子(hooks)。
AuthenticateRequest由代理发送,请求客户端身份验证。
AuthenticateResponse由插件发送,返回身份验证结果。
AuthorizeRequest由代理发送,请求对特定操作(连接、发布、订阅)进行授权。
AuthorizeResponse由插件发送,返回授权结果。
MqttMessage代表 MQTT 消息的结构,用于各种钩子。
ClientConnectedEvent客户端成功连接时发送的通知。
ClientDisconnectedEvent客户端断开连接时发送的通知。

完整 Proto 定义

以下是 yedmq_plugin_protocol.proto 的完整内容,供快速参考:

syntax = "proto3";

package plugin_protocol;

import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";

// Protocol message body
message ProtocolMessage {
string version = 1; // Protocol version, e.g. "1.0"
MessageType type = 2; // Message type
string id = 3; // Unique message identifier
google.protobuf.Timestamp timestamp = 4; // Timestamp
string source = 5; // Sender identifier
string target = 6; // Receiver identifier

// The following fields are used selectively based on type
optional Method method = 7; // Method type (used for requests)
google.protobuf.Any params = 8; // Parameters (used for requests/notifications)
google.protobuf.Any result = 9; // Result (used for responses)
optional ErrorInfo error = 10; // Error information (used for error responses)

// Extension fields
map<string, string> metadata = 15; // Metadata
}

// Error information
message ErrorInfo {
uint32 code = 1; // error code
string message = 2; // error message
google.protobuf.Struct details = 3; // additional error details
}


// Message type enum
enum MessageType {
MESSAGE_TYPE_UNSPECIFIED = 0;
MESSAGE_TYPE_REQUEST = 1;
MESSAGE_TYPE_RESPONSE = 2;
MESSAGE_TYPE_NOTIFICATION = 3;
MESSAGE_TYPE_EVENT = 4;
MESSAGE_TYPE_ERROR = 5;
MESSAGE_TYPE_BATCH_REQUEST = 6;
MESSAGE_TYPE_BATCH_RESPONSE = 7;
}

enum Method {
METHOD_UNSPECIFIED = 0;

// Lifecycle management (10-19)
METHOD_INITIALIZE = 10;
METHOD_SHUTDOWN = 11;
METHOD_PING = 12;
METHOD_REGISTER_PLUGIN = 13;
METHOD_UNREGISTER_PLUGIN = 14;

// Authentication and authorization (20-29)
METHOD_AUTHENTICATE = 20;
METHOD_AUTHORIZE = 21;
METHOD_GET_ACL = 23;

// MQTT message handling (30-39)
METHOD_ON_MESSAGE_PUBLISH = 30;
METHOD_ON_MESSAGE_SUBSCRIBE = 31;
METHOD_ON_MESSAGE_UNSUBSCRIBE = 32;

// Event notification (50-59)
METHOD_CLIENT_CONNECTED = 50;
METHOD_CLIENT_DISCONNECTED = 51;
METHOD_MESSAGE_PUBLISHED = 52;
METHOD_SUBSCRIPTION_ADDED = 53;
METHOD_SUBSCRIPTION_REMOVED = 54;

// Monitoring (60-69)
METHOD_GET_STATS = 60;
METHOD_RESET_STATS = 61;
}

// Plugin initialization
message InitializeRequest {
google.protobuf.Struct plugin_config = 1; // Plugin configuration
BrokerInfo broker_info = 2; // Broker information
repeated string required_capabilities = 3; // Required capabilities
}


message InitializeResponse {
string status = 1; // Status: "ready", "error"
repeated string capabilities = 2; // Capabilities provided by the plugin
repeated Hook hooks = 3; // Registered hooks
google.protobuf.Struct plugin_info = 4; // Plugin information
string auth_code = 5; // Authentication code for plugin registration
}

message BrokerInfo {
string version = 1;
int32 node_id = 2;
string cluster_name = 3;
google.protobuf.Struct properties = 4;
}

// Hook definition
message Hook {
string name = 1; // Hook name
uint32 priority = 2; // Priority (0-1000)
google.protobuf.Struct filter = 3; // Filter conditions
}

message AuthenticateRequest {
string client_id = 1;
string username = 2;
string password = 3;
string client_ip = 4;
bytes client_cert = 5; // Client certificate
string protocol_version = 6;
google.protobuf.Struct properties = 7; // MQTT 5.0 properties
}

message AuthenticateResponse {
bool authenticated = 1; // Authentication result
repeated string permissions = 2; // Permission list
google.protobuf.Struct session_data = 3; // Session data
optional string error_reason = 4; // Failure reason
optional string tenant_id = 5; // Tenant ID
}

message AuthorizeRequest {
string tenant_id = 1;
string client_id = 2;
string username = 3;
AuthAction action = 4; // Action type
string topic = 5; // Topic
uint32 qos = 6; // QoS level
google.protobuf.Struct context = 7; // Context information
}

message AuthorizeResponse {
bool authorized = 1;
optional string reason = 2; // Denial reason
google.protobuf.Struct modified_context = 3; // Modified context
}

enum AuthAction {
AUTH_ACTION_UNSPECIFIED = 0;
AUTH_ACTION_CONNECT = 1;
AUTH_ACTION_PUBLISH = 2;
AUTH_ACTION_SUBSCRIBE = 3;
AUTH_ACTION_UNSUBSCRIBE = 4;
}

message MqttMessage {
string tenant_id = 1;
string client_id = 2; // Client ID
string topic = 3; // Topic
bytes payload = 4; // Payload data
uint32 qos = 5;// QoS level
bool retain = 6; // Retain flag
bool dup = 7; // Duplicate flag
google.protobuf.Timestamp publish_time = 8; // Publish time
google.protobuf.Struct properties = 9; // MQTT 5.0 properties
optional string message_id = 10; // Message ID
}

// Message publish handling
message MessagePublishRequest {
MqttMessage message = 1;
google.protobuf.Struct context = 2; // Context information
}

message MessagePublishResponse {
bool allow = 1; // Whether publishing is allowed
optional MqttMessage modified_message = 2; // Modified message
optional string error_reason = 3; // Denial reason
optional bool continue_chain = 4;
}

// Subscription handling
message SubscribeRequest {
string client_id = 1;
repeated TopicFilter subscriptions = 2;
google.protobuf.Struct context = 3;
}

message SubscribeResponse {
repeated SubscribeResult results = 1;
optional bool continue_chain = 2;
}

message TopicFilter {
string topic = 1;
uint32 qos = 2;
google.protobuf.Struct options = 3; // Subscription options
}

message SubscribeResult {
string topic = 1;
bool allowed = 2;
uint32 granted_qos = 3;
optional string reason = 4;
}

// Client events
message ClientConnectedEvent {
string tenant_id = 1;
string client_id = 2;
string client_ip = 3;
string protocol_version = 4;
google.protobuf.Timestamp connect_time = 5;
google.protobuf.Struct client_info = 6;
}
message ClientDisconnectedEvent {
string tenant_id = 1;
string client_id = 2;
string reason = 3; // Disconnect reason
google.protobuf.Timestamp disconnect_time = 4;
google.protobuf.Struct session_info = 5;
}

// Statistics information
message StatsRequest {
repeated string metrics = 1; // Requested metrics
google.protobuf.Timestamp start_time = 2;
google.protobuf.Timestamp end_time = 3;
}

message StatsResponse {
map<string, MetricValue> metrics = 1;
google.protobuf.Timestamp timestamp = 2;
}

message MetricValue {
oneof value {
int64 int_value = 1;
double double_value = 2;
string string_value = 3;
google.protobuf.Struct struct_value = 4;
}
}

message Pong { }

message BatchRequest {
repeated ProtocolMessage messages = 1;
bool ordered = 2;
uint32 timeout_ms = 3;
}

message BatchResponse {
repeated ProtocolMessage responses = 1;
repeated ErrorInfo errors = 2;
google.protobuf.Struct stats = 3;
}

message PluginMetadata {
string name = 1;
string version = 2;
string description = 3;
string author = 4;
repeated string capabilities = 5;
google.protobuf.Struct config_schema = 6;
repeated Dependency dependencies = 7;
}

message Dependency {
string name = 1;
string version_constraint = 2;
bool optional = 3;
}