协议定义
YedMQ 代理与其插件之间的通信基于 Protocol Buffers (Protobuf)。所有消息都包装在一个通用的 ProtocolMessage 信封中。
源文件
插件协议的权威定义可以在 YedMQ 源码库的以下文件中找到:
YedMQ/plugin_protocol/proto/yedmq_plugin_protocol.proto
开发者应使用此文件在他们喜欢的编程语言中生成 Protobuf 绑定。
消息概览
ProtocolMessage
每个 IPC 帧的有效载荷都是一个 ProtocolMessage。它包含元数据和一个变体字段(使用 google.protobuf.Any),用于存储实际的请求、 响应或通知数据。
type: 指示消息是REQUEST(请求)、RESPONSE(响应)、NOTIFICATION(通知)、EVENT(事件)还是ERROR(错误)。method: 对于请求,指定要执行的操作(例如INITIALIZE,AUTHENTICATE)。params: 一个Any字段,包含请求参数。result: 一个Any字段,包含成功的响应数据。error: 如果操作失败,则包含错误详情。
关键消息类型
| 消息类型 | 描述 |
|---|---|
InitializeRequest | 由代理发送,用于启动与插件的握手。 |
InitializeResponse | 由插件发送,提供其能力(capabilities)和钩子(hooks)。 |
AuthenticateRequest | 由代理发送,请求客户端身份验证。 |
AuthenticateResponse | 由插件发送,返回身份验证结果。 |
AuthorizeRequest | 由代理发送,请求对特定操作(连接、发布、订阅)进行授权。 |
AuthorizeResponse | 由插件发送,返回授权结果。 |
MqttMessage | 代表 MQTT 消息的结构,用于各种钩子。 |
ClientConnectedEvent | 客户端成功连接时发送的通知。 |
ClientDisconnectedEvent | 客户端断开连接时发送的通知。 |
完整 Proto 定义
以下是 yedmq_plugin_protocol.proto 的完整内容,供快速参考:
syntax = "proto3";
package plugin_protocol;
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/struct.proto";
// Protocol message body
message ProtocolMessage {
string version = 1; // Protocol version, e.g. "1.0"
MessageType type = 2; // Message type
string id = 3; // Unique message identifier
google.protobuf.Timestamp timestamp = 4; // Timestamp
string source = 5; // Sender identifier
string target = 6; // Receiver identifier
// The following fields are used selectively based on type
optional Method method = 7; // Method type (used for requests)
google.protobuf.Any params = 8; // Parameters (used for requests/notifications)
google.protobuf.Any result = 9; // Result (used for responses)
optional ErrorInfo error = 10; // Error information (used for error responses)
// Extension fields
map<string, string> metadata = 15; // Metadata
}
// Error information
message ErrorInfo {
uint32 code = 1; // error code
string message = 2; // error message
google.protobuf.Struct details = 3; // additional error details
}
// Message type enum
enum MessageType {
MESSAGE_TYPE_UNSPECIFIED = 0;
MESSAGE_TYPE_REQUEST = 1;
MESSAGE_TYPE_RESPONSE = 2;
MESSAGE_TYPE_NOTIFICATION = 3;
MESSAGE_TYPE_EVENT = 4;
MESSAGE_TYPE_ERROR = 5;
MESSAGE_TYPE_BATCH_REQUEST = 6;
MESSAGE_TYPE_BATCH_RESPONSE = 7;
}
enum Method {
METHOD_UNSPECIFIED = 0;
// Lifecycle management (10-19)
METHOD_INITIALIZE = 10;
METHOD_SHUTDOWN = 11;
METHOD_PING = 12;
METHOD_REGISTER_PLUGIN = 13;
METHOD_UNREGISTER_PLUGIN = 14;
// Authentication and authorization (20-29)
METHOD_AUTHENTICATE = 20;
METHOD_AUTHORIZE = 21;
METHOD_GET_ACL = 23;
// MQTT message handling (30-39)
METHOD_ON_MESSAGE_PUBLISH = 30;
METHOD_ON_MESSAGE_SUBSCRIBE = 31;
METHOD_ON_MESSAGE_UNSUBSCRIBE = 32;
// Event notification (50-59)
METHOD_CLIENT_CONNECTED = 50;
METHOD_CLIENT_DISCONNECTED = 51;
METHOD_MESSAGE_PUBLISHED = 52;
METHOD_SUBSCRIPTION_ADDED = 53;
METHOD_SUBSCRIPTION_REMOVED = 54;
// Monitoring (60-69)
METHOD_GET_STATS = 60;
METHOD_RESET_STATS = 61;
}
// Plugin initialization
message InitializeRequest {
google.protobuf.Struct plugin_config = 1; // Plugin configuration
BrokerInfo broker_info = 2; // Broker information
repeated string required_capabilities = 3; // Required capabilities
}
message InitializeResponse {
string status = 1; // Status: "ready", "error"
repeated string capabilities = 2; // Capabilities provided by the plugin
repeated Hook hooks = 3; // Registered hooks
google.protobuf.Struct plugin_info = 4; // Plugin information
string auth_code = 5; // Authentication code for plugin registration
}
message BrokerInfo {
string version = 1;
int32 node_id = 2;
string cluster_name = 3;
google.protobuf.Struct properties = 4;
}
// Hook definition
message Hook {
string name = 1; // Hook name
uint32 priority = 2; // Priority (0-1000)
google.protobuf.Struct filter = 3; // Filter conditions
}
message AuthenticateRequest {
string client_id = 1;
string username = 2;
string password = 3;
string client_ip = 4;
bytes client_cert = 5; // Client certificate
string protocol_version = 6;
google.protobuf.Struct properties = 7; // MQTT 5.0 properties
}
message AuthenticateResponse {
bool authenticated = 1; // Authentication result
repeated string permissions = 2; // Permission list
google.protobuf.Struct session_data = 3; // Session data
optional string error_reason = 4; // Failure reason
optional string tenant_id = 5; // Tenant ID
}
message AuthorizeRequest {
string tenant_id = 1;
string client_id = 2;
string username = 3;
AuthAction action = 4; // Action type
string topic = 5; // Topic
uint32 qos = 6; // QoS level
google.protobuf.Struct context = 7; // Context information
}
message AuthorizeResponse {
bool authorized = 1;
optional string reason = 2; // Denial reason
google.protobuf.Struct modified_context = 3; // Modified context
}
enum AuthAction {
AUTH_ACTION_UNSPECIFIED = 0;
AUTH_ACTION_CONNECT = 1;
AUTH_ACTION_PUBLISH = 2;
AUTH_ACTION_SUBSCRIBE = 3;
AUTH_ACTION_UNSUBSCRIBE = 4;
}
message MqttMessage {
string tenant_id = 1;
string client_id = 2; // Client ID
string topic = 3; // Topic
bytes payload = 4; // Payload data
uint32 qos = 5;// QoS level
bool retain = 6; // Retain flag
bool dup = 7; // Duplicate flag
google.protobuf.Timestamp publish_time = 8; // Publish time
google.protobuf.Struct properties = 9; // MQTT 5.0 properties
optional string message_id = 10; // Message ID
}
// Message publish handling
message MessagePublishRequest {
MqttMessage message = 1;
google.protobuf.Struct context = 2; // Context information
}
message MessagePublishResponse {
bool allow = 1; // Whether publishing is allowed
optional MqttMessage modified_message = 2; // Modified message
optional string error_reason = 3; // Denial reason
optional bool continue_chain = 4;
}
// Subscription handling
message SubscribeRequest {
string client_id = 1;
repeated TopicFilter subscriptions = 2;
google.protobuf.Struct context = 3;
}
message SubscribeResponse {
repeated SubscribeResult results = 1;
optional bool continue_chain = 2;
}
message TopicFilter {
string topic = 1;
uint32 qos = 2;
google.protobuf.Struct options = 3; // Subscription options
}
message SubscribeResult {
string topic = 1;
bool allowed = 2;
uint32 granted_qos = 3;
optional string reason = 4;
}
// Client events
message ClientConnectedEvent {
string tenant_id = 1;
string client_id = 2;
string client_ip = 3;
string protocol_version = 4;
google.protobuf.Timestamp connect_time = 5;
google.protobuf.Struct client_info = 6;
}
message ClientDisconnectedEvent {
string tenant_id = 1;
string client_id = 2;
string reason = 3; // Disconnect reason
google.protobuf.Timestamp disconnect_time = 4;
google.protobuf.Struct session_info = 5;
}
// Statistics information
message StatsRequest {
repeated string metrics = 1; // Requested metrics
google.protobuf.Timestamp start_time = 2;
google.protobuf.Timestamp end_time = 3;
}
message StatsResponse {
map<string, MetricValue> metrics = 1;
google.protobuf.Timestamp timestamp = 2;
}
message MetricValue {
oneof value {
int64 int_value = 1;
double double_value = 2;
string string_value = 3;
google.protobuf.Struct struct_value = 4;
}
}
message Pong { }
message BatchRequest {
repeated ProtocolMessage messages = 1;
bool ordered = 2;
uint32 timeout_ms = 3;
}
message BatchResponse {
repeated ProtocolMessage responses = 1;
repeated ErrorInfo errors = 2;
google.protobuf.Struct stats = 3;
}
message PluginMetadata {
string name = 1;
string version = 2;
string description = 3;
string author = 4;
repeated string capabilities = 5;
google.protobuf.Struct config_schema = 6;
repeated Dependency dependencies = 7;
}
message Dependency {
string name = 1;
string version_constraint = 2;
bool optional = 3;
}