飞书服务端SDK已全面支持Java、Go、Python与Node.js等主流开发语言,然而.NET生态系统的开发者们却面临着官方SDK缺失的困境,这无疑为.NET社区接入飞书平台带来了不便。
一、.net中如何实现飞书WebSocket长连接
为什么选择飞书WebSocket?
相较于传统的Webhook模式,长连接模式大大降低了接入成本,将原先1周左右的开发周期降低到5分钟。
核心优势:
- 开发便捷:无需公网IP或域名,本地环境即可接收回调
- 安全传输:内置加密和鉴权,无需额外安全处理
- 实时性强:消息延迟从分钟级降至毫秒级
- 资源高效:避免频繁HTTP请求,连接复用多种事件类型
企业级应用场景
飞书平台提供丰富的事件类型,支持:
- 用户事件:员工入职/离职自动化处理
- 消息事件:智能客服、消息机器人
- 审批事件:OA系统集成、自动审批
- 部门事件:组织架构同步管理
- 其它事件:.....
Mud.Feishu架构设计

二、抽象层设计(Mud.Feishu.Abstractions)
事件处理策略模式
Mud.Feishu采用策略模式实现灵活的事件处理机制,核心接口设计简洁而强大:
public interface IFeishuEventHandler
{
string SupportedEventType { get; }
Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default);
}
策略模式核心优势:
- 🎯 单一职责:每个处理器专注特定事件类型
- 🔧 开闭原则:对扩展开放,对修改封闭
- 🧪 可测试性:独立测试,依赖清晰
- ⚡ 运行时多态:动态选择处理策略
实际应用示例:
public class UserCreatedEventHandler : IFeishuEventHandler
{
public string SupportedEventType => "contact.user.created_v3";
public async Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default)
{
var user = JsonSerializer.Deserialize<UserData>(eventData.EventJson);
await _userService.CreateUserAsync(user, cancellationToken);
_logger.LogInformation("用户创建事件处理完成: {UserId}", user.UserId);
}
}
public class MessageReceiveEventHandler : IFeishuEventHandler
{
public string SupportedEventType => "im.message.receive_v1";
public async Task HandleAsync(EventData eventData, CancellationToken cancellationToken = default)
{
var message = JsonSerializer.Deserialize<MessageData>(eventData.EventJson);
await _messageService.ProcessMessageAsync(message, cancellationToken);
}
}
事件类型与数据模型
EventData统一事件模型
public class EventData
{
public string EventId { get; set; } = string.Empty;
public string EventType { get; set; } = string.Empty;
public DateTime EventTime { get; set; } = DateTime.UtcNow;
public object? Event { get; set; }
public string EventJson { get; set; } = string.Empty;
}
主要事件类型
| 类别 | 事件类型 | 说明 |
|---|
| 用户管理 | contact.user.created_v3 | 用户创建 |
| contact.user.updated_v3 | 用户更新 |
| contact.user.deleted_v3 | 用户删除 |
| 消息事件 | im.message.receive_v1 | 接收消息 |
| im.message.message_read_v1 | 消息已读 |
| 部门管理 | contact.department.created_v3 | 部门创建 |
| contact.department.updated_v3 | 部门更新 |
| 审批流程 | approval.approval.approved_v1 | 审批通过 |
| approval.approval.rejected_v1 | 审批拒绝 |
| ... | ... | ... |
强类型数据模型示例
public class UserCreatedEvent
{
[JsonPropertyName("user_id")]
public string UserId { get; set; }
[JsonPropertyName("name")]
public string Name { get; set; }
[JsonPropertyName("email")]
public string Email { get; set; }
}
public class MessageReceiveEvent
{
[JsonPropertyName("message_id")]
public string MessageId { get; set; }
[JsonPropertyName("chat_id")]
public string ChatId { get; set; }
[JsonPropertyName("content")]
public string Content { get; set; }
}
工厂模式应用
事件处理器工厂
public interface IFeishuEventHandlerFactory
{
IFeishuEventHandler? GetHandler(string eventType);
void RegisterHandler(IFeishuEventHandler handler);
IReadOnlyList<string> GetRegisteredEventTypes();
}
使用示例:
var factory = serviceProvider.GetService<IFeishuEventHandlerFactory>();
factory.RegisterHandler(new UserCreatedEventHandler());
factory.RegisterHandler(new MessageReceiveEventHandler());
var handler = factory.GetHandler("contact.user.created_v3");
if (handler != null)
{
await handler.HandleAsync(eventData);
}
三、核心实现层(Mud.Feishu.WebSocket)
组件化架构设计
Mud.Feishu.WebSocket采用严格的组件化设计,包含四个核心组件:
3.1 连接管理器
核心职责
- 连接生命周期管理:建立、维护、恢复WebSocket连接
- 线程安全:使用SemaphoreSlim确保并发安全
- 超时控制:多级取消令牌支持精确超时控制
- 自动重连:智能重连策略,支持指数退避算法
连接状态管理
关键实现特点
- 事件驱动通知机制(Connected/Disconnected/Error事件)
- 线程安全的连接操作
- 灵活的重连策略配置
3.2 认证管理器
认证流程设计
核心功能
- 令牌验证:应用访问令牌有效性检查
- 消息构建:标准化认证消息格式
- 状态管理:认证状态实时跟踪
- 自动重试:认证失败智能重试机制
认证消息格式
{
"timestamp": 1703920800,
"data": {
"app_access_token": "your_app_access_token_here"
}
}
认证机制架构设计
认证管理器采用命令模式和回调机制,将认证逻辑与网络通信解耦,确保认证过程的可测试性和可扩展性。
public class AuthenticationManager
{
private readonly ILogger<AuthenticationManager> _logger;
private readonly Func<string, Task> _sendMessageCallback;
private bool _isAuthenticated = false;
private readonly FeishuWebSocketOptions _options;
public event EventHandler<EventArgs>? Authenticated;
public event EventHandler<WebSocketErrorEventArgs>? AuthenticationFailed;
public bool IsAuthenticated => _isAuthenticated;
public AuthenticationManager(
ILogger<AuthenticationManager> logger,
FeishuWebSocketOptions options,
Func<string, Task> sendMessageCallback)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_sendMessageCallback = sendMessageCallback ?? throw new ArgumentNullException(nameof(sendMessageCallback));
_options = options;
}
public async Task AuthenticateAsync(string appAccessToken, CancellationToken cancellationToken = default)
{
if (string.IsNullOrEmpty(appAccessToken))
throw new ArgumentException("应用访问令牌不能为空", nameof(appAccessToken));
try
{
_logger.LogInformation("正在进行WebSocket认证...");
_isAuthenticated = false;
var authMessage = new AuthMessage
{
Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
Data = new AuthData
{
AppAccessToken = appAccessToken
}
};
var jsonOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
WriteIndented = false
};
var authJson = JsonSerializer.Serialize(authMessage, jsonOptions);
await _sendMessageCallback(authJson);
_logger.LogInformation("已发送认证消息,等待响应...");
}
catch (Exception ex)
{
_isAuthenticated = false;
_logger.LogError(ex, "WebSocket认证失败");
var errorArgs = new WebSocketErrorEventArgs
{
Exception = ex,
ErrorMessage = $"WebSocket认证失败: {ex.Message}",
ErrorType = ex.GetType().Name,
IsAuthError = true
};
AuthenticationFailed?.Invoke(this, errorArgs);
throw;
}
}
public void HandleAuthResponse(string responseMessage)
{
try
{
var authResponse = JsonSerializer.Deserialize<AuthResponseMessage>(responseMessage);
if (authResponse?.Code == 0)
{
_isAuthenticated = true;
_logger.LogInformation("WebSocket认证成功: {Message}", authResponse.Message);
Authenticated?.Invoke(this, EventArgs.Empty);
}
else
{
_isAuthenticated = false;
_logger.LogError("WebSocket认证失败: {Code} - {Message}",
authResponse?.Code, authResponse?.Message);
var errorArgs = new WebSocketErrorEventArgs
{
ErrorMessage = $"WebSocket认证失败: {authResponse?.Code} - {authResponse?.Message}",
IsAuthError = true
};
AuthenticationFailed?.Invoke(this, errorArgs);
}
}
catch (JsonException ex)
{
_isAuthenticated = false;
_logger.LogError(ex, "解析认证响应失败: {Message}", responseMessage);
var errorArgs = new WebSocketErrorEventArgs
{
Exception = ex,
ErrorMessage = $"解析认证响应失败: {ex.Message}",
ErrorType = ex.GetType().Name,
IsAuthError = true
};
AuthenticationFailed?.Invoke(this, errorArgs);
}
catch (Exception ex)
{
_isAuthenticated = false;
_logger.LogError(ex, "处理认证响应时发生错误");
var errorArgs = new WebSocketErrorEventArgs
{
Exception = ex,
ErrorMessage = $"处理认证响应时发生错误: {ex.Message}",
ErrorType = ex.GetType().Name,
IsAuthError = true
};
AuthenticationFailed?.Invoke(this, errorArgs);
}
}
public void ResetAuthentication()
{
_isAuthenticated = false;
_logger.LogDebug("已重置认证状态");
}
}
认证流程详细机制
认证管理器的实现遵循安全性和可靠性原则,确保认证过程的安全和稳定:
参数验证机制:在认证开始前进行严格的参数验证,包括:
- AppAccessToken的非空检查
- 令牌格式验证
- 权限范围确认
消息构建策略:采用标准化的认证消息格式,包含:
状态管理:通过内部状态标志确保认证状态的一致性:
_isAuthenticated标志控制认证状态- 状态变化时触发相应事件
- 支持状态查询和重置
异常处理机制:完善的错误处理和恢复策略:
认证交互时序图
核心认证消息格式
飞书WebSocket认证使用标准化的JSON消息格式:
{
"timestamp": 1703920800,
"data": {
"app_access_token": "your_app_access_token_here"
}
}
响应消息格式
{
"code": 0,
"msg": "success",
"data": {
"session_id": "websocket_session_id",
"expires_in": 3600
}
}
认证安全考虑
令牌安全:
- AppAccessToken不应在代码中硬编码
- 支持令牌自动刷新机制
- 令牌存储和传输加密
防重放攻击:
- 使用时间戳验证消息新鲜度
- 支持消息签名机制
- 会话唯一性标识
错误处理:
- 敏感信息不记录到日志
- 认证失败不暴露具体错误
- 支持优雅降级处理
3.3 消息路由器(MessageRouter)
消息路由器是飞书WebSocket框架的消息分发核心,负责识别消息类型、版本信息,并将消息精确路由到对应的处理器。它采用策略模式和责任链模式的组合,确保消息处理的高效性和可扩展性。
消息路由架构

消息类型识别
消息路由器首先需要对接收到的消息进行类型和版本识别,以确定合适的处理策略。飞书WebSocket支持多种消息格式,包括v1.0和v2.0协议版本。
public interface IMessageHandler
{
bool CanHandle(string messageType);
Task HandleAsync(string message, CancellationToken cancellationToken = default);
}
public class MessageRouter
{
private readonly ILogger<MessageRouter> _logger;
private readonly List<IMessageHandler> _handlers;
private readonly FeishuWebSocketOptions _options;
public MessageRouter(ILogger<MessageRouter> logger, FeishuWebSocketOptions options)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_handlers = new List<IMessageHandler>();
_options = options;
}
public void RegisterHandler(IMessageHandler handler)
{
if (handler == null)
throw new ArgumentNullException(nameof(handler));
_handlers.Add(handler);
_logger.LogDebug("已注册消息处理器: {HandlerType}", handler.GetType().Name);
}
public bool UnregisterHandler(IMessageHandler handler)
{
var removed = _handlers.Remove(handler);
if (removed)
{
if (_options.EnableLogging)
{
_logger.LogDebug("已移除消息处理器: {HandlerType}", handler.GetType().Name);
}
}
return removed;
}
public async Task RouteMessageAsync(string message, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(message))
{
if (_options.EnableLogging)
{
_logger.LogWarning("收到空消息,跳过路由");
}
return;
}
await RouteMessageInternalAsync(message, "Text", cancellationToken);
}
public async Task RouteBinaryMessageAsync(string jsonContent, string messageType, CancellationToken cancellationToken = default)
{
if (string.IsNullOrWhiteSpace(jsonContent))
{
if (_options.EnableLogging)
{
_logger.LogWarning("收到空的二进制转换消息,跳过路由");
}
return;
}
await RouteMessageInternalAsync(jsonContent, $"Binary_{messageType}", cancellationToken);
}
private string ExtractMessageType(string message)
{
try
{
using var jsonDoc = System.Text.Json.JsonDocument.Parse(message);
var root = jsonDoc.RootElement;
if (root.TryGetProperty("schema", out var schemaElement) &&
schemaElement.GetString() == "2.0")
{
if (root.TryGetProperty("header", out var headerElement) &&
headerElement.TryGetProperty("event_type", out var eventTypeElement))
{
return "event";
}
}
if (root.TryGetProperty("type", out var typeElement))
{
return typeElement.GetString()?.ToLowerInvariant() ?? string.Empty;
}
return string.Empty;
}
catch (System.Text.Json.JsonException ex)
{
_logger.LogError(ex, "解析消息JSON失败: {Message}", message);
return string.Empty;
}
}
private async Task RouteMessageInternalAsync(string message, string sourceType, CancellationToken cancellationToken)
{
try
{
var messageType = ExtractMessageType(message);
if (string.IsNullOrEmpty(messageType))
{
_logger.LogWarning("无法提取消息类型 (来源: {SourceType}): {Message}", sourceType, message);
return;
}
var handler = _handlers.FirstOrDefault(h => h.CanHandle(messageType));
if (handler == null)
{
_logger.LogWarning("未找到能处理消息类型 {MessageType} 的处理器 (来源: {SourceType})", messageType, sourceType);
return;
}
_logger.LogDebug("将消息路由到处理器: {HandlerType} (来源: {SourceType}, 消息类型: {MessageType})",
handler.GetType().Name, sourceType, messageType);
await handler.HandleAsync(message, cancellationToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "路由消息时发生错误 (来源: {SourceType}): {Message}", sourceType, message);
}
}
}
消息路由流程详解
消息路由器的核心职责是根据消息特征进行智能分发,其工作流程包括以下几个关键步骤:
- 消息预验证:检查消息的基本格式和有效性
- 版本识别:通过schema字段或type字段确定消息版本
- 类型匹配:根据消息内容找到对应的处理器
- 异步分发:将消息异步发送给匹配的处理器
结果聚合:收集处理结果并进行后续处理

路由策略设计
消息路由器采用多维度匹配策略,确保消息能够精确路由:
基于版本的匹配:
- v1.0协议:通过
type字段识别消息类型 - v2.0协议:通过
schema字段确认版本 - 向后兼容:支持旧版本消息格式
基于类型的匹配:
- 事件消息:
event类型,如消息接收、状态变更 - 响应消息:
response类型,如认证响应、操作结果 - 通知消息:
notification类型,如系统通知
基于优先级的匹配:
- 高优先级处理器:认证、心跳等关键消息
- 中优先级处理器:业务事件、用户消息
- 低优先级处理器:统计数据、日志信息
处理器注册机制
消息路由器支持动态注册和注销消息处理器:
router.RegisterHandler(new AuthMessageHandler());
router.RegisterHandler(new EventMessageHandler());
router.RegisterHandler(new BinaryMessageHandler());
public interface IMessageHandler
{
bool CanHandle(string messageType, MessageVersionInfo version);
Task<HandlerResult> HandleAsync(string message, CancellationToken cancellationToken);
int Priority { get; }
}
性能优化策略
- 并行处理:支持多个消息并行处理,提高吞吐量
- 缓存机制:缓存处理器匹配结果,减少重复计算
- 批处理:支持批量消息处理,减少网络开销
- 连接池:复用连接资源,提高连接效率
错误处理机制
- 容错设计:单个处理器异常不影响其他处理器
- 降级策略:无匹配处理器时使用默认处理逻辑
- 重试机制:处理失败时支持自动重试
- 监控告警:记录处理异常并触发告警机制
处理器分发机制
public class RoutingStrategy
{
private readonly ILogger<MessageRouter> _logger;
private readonly FeishuWebSocketOptions _options;
public async Task RouteMessageAsync(string message, string sourceType,
List<IMessageHandler> handlers, CancellationToken cancellationToken)
{
var messageType = ExtractMessageType(message);
if (string.IsNullOrEmpty(messageType))
{
if (_options.EnableLogging)
_logger.LogWarning("无法提取消息类型 (来源: {SourceType}): {Message}", sourceType, message);
return;
}
var handler = handlers.FirstOrDefault(h => h.CanHandle(messageType));
if (handler == null)
{
if (_options.EnableLogging)
_logger.LogWarning("未找到能处理消息类型 {MessageType} 的处理器 (来源: {SourceType})",
messageType, sourceType);
return;
}
if (_options.EnableLogging)
_logger.LogDebug("将消息路由到处理器: {HandlerType} (来源: {SourceType}, 消息类型: {MessageType})",
handler.GetType().Name, sourceType, messageType);
await handler.HandleAsync(message, cancellationToken);
}
private string ExtractMessageType(string message)
{
try
{
using var jsonDoc = JsonDocument.Parse(message);
var root = jsonDoc.RootElement;
if (root.TryGetProperty("schema", out var schemaElement) &&
schemaElement.GetString() == "2.0")
{
if (root.TryGetProperty("header", out var headerElement) &&
headerElement.TryGetProperty("event_type", out var eventTypeElement))
{
return "event";
}
}
if (root.TryGetProperty("type", out var typeElement))
{
return typeElement.GetString()?.ToLowerInvariant() ?? string.Empty;
}
return string.Empty;
}
catch (JsonException ex)
{
_logger.LogError(ex, "解析消息JSON失败");
return string.Empty;
}
}
}
3.4 二进制消息处理器(BinaryMessageProcessor)
二进制消息处理器是飞书WebSocket框架中专门处理二进制数据流的核心组件,负责增量接收、数据组装、格式解析和消息分发。它采用流式处理架构,支持大消息的分片接收和解析。
二进制消息处理架构
增量数据接收
public class BinaryMessageProcessor : IDisposable
{
private readonly ILogger<BinaryMessageProcessor> _logger;
private readonly FeishuWebSocketOptions _options;
private MemoryStream? _binaryDataStream;
private readonly object _binaryDataStreamLock = new object();
private DateTime _binaryDataReceiveStartTime = DateTime.MinValue;
private bool _disposed = false;
private readonly MessageRouter? _messageRouter;
private readonly WebSocketConnectionManager? _connectionManager;
public event EventHandler<WebSocketBinaryMessageEventArgs>? BinaryMessageReceived;
public event EventHandler<WebSocketErrorEventArgs>? Error;
public async Task ProcessBinaryDataAsync(byte[] data, int offset, int count,
bool endOfMessage, CancellationToken cancellationToken = default)
{
try
{
lock (_binaryDataStreamLock)
{
if (_binaryDataStream == null)
{
_binaryDataStream = new MemoryStream();
_binaryDataReceiveStartTime = DateTime.UtcNow;
if (_options.EnableLogging)
_logger.LogDebug("开始接收新的二进制消息");
}
_binaryDataStream.Write(data, offset, count);
if (_binaryDataStream.Length > _options.MaxBinaryMessageSize)
{
var errorMessage = $"二进制消息大小超过限制 ({_binaryDataStream.Length} > {_options.MaxBinaryMessageSize})";
_logger.LogError(errorMessage);
_binaryDataStream.Dispose();
_binaryDataStream = null;
OnError(errorMessage, "MessageSizeExceeded");
return;
}
if (endOfMessage)
{
var completeData = _binaryDataStream.ToArray();
var receiveDuration = DateTime.UtcNow - _binaryDataReceiveStartTime;
if (_options.EnableLogging)
_logger.LogInformation("二进制消息接收完成,大小: {Size} 字节,耗时: {Duration}ms",
completeData.Length, receiveDuration.TotalMilliseconds);
_ = Task.Run(async () =>
{
await ProcessCompleteBinaryMessageAsync(completeData, cancellationToken);
}, cancellationToken);
_binaryDataStream.Dispose();
_binaryDataStream = null;
}
else
{
if (_options.EnableLogging)
_logger.LogDebug("已接收二进制消息片段,当前总大小: {Size} 字节",
_binaryDataStream.Length);
}
}
}
catch (Exception ex)
{
lock (_binaryDataStreamLock)
{
_binaryDataStream?.Dispose();
_binaryDataStream = null;
}
if (_options.EnableLogging)
_logger.LogError(ex, "处理二进制消息时发生错误");
OnError($"处理二进制消息时发生错误: {ex.Message}", ex.GetType().Name);
}
}
}
核心特性
- 📦 流式处理:支持大消息分片接收
- 🔄 双格式支持:ProtoBuf优先,JSON回退
- 📊 大小限制:可配置的消息大小限制
- 🎯 自动路由:解析后自动路由到消息处理器
处理流程
- 增量接收:分片接收二进制数据,写入内存流
- 大小检查:实时监控数据大小,防止内存溢出
- 格式解析:ProtoBuf反序列化,失败则回退到JSON
- 消息路由:提取JSON Payload,路由到对应处理器
- ACK确认:向服务器发送处理确认
主客户端集成(FeishuWebSocketClient)
FeishuWebSocketClient是整个框架的门面组件,负责协调和编排各个子组件的工作。它采用组件化设计模式,将复杂的WebSocket连接管理功能分解为独立的、可复用的组件,并通过统一的事件系统进行集成。
客户端架构集成

组件协调与编排
public sealed class FeishuWebSocketClient : IFeishuWebSocketClient, IDisposable
{
private readonly ILogger<FeishuWebSocketClient> _logger;
private readonly FeishuWebSocketOptions _options;
private readonly IFeishuEventHandlerFactory _eventHandlerFactory;
private readonly WebSocketConnectionManager _connectionManager;
private readonly AuthenticationManager _authManager;
private readonly MessageRouter _messageRouter;
private readonly BinaryMessageProcessor _binaryProcessor;
private readonly ConcurrentQueue<string> _messageQueue = new();
private readonly List<Func<string, Task>> _messageProcessors = new();
private Task? _messageProcessingTask;
private ILoggerFactory _loggerFactory;
private bool _disposed = false;
private CancellationTokenSource? _cancellationTokenSource;
public WebSocketState State => _connectionManager.State;
public bool IsAuthenticated => _authManager.IsAuthenticated;
public event EventHandler<EventArgs>? Connected;
public event EventHandler<WebSocketCloseEventArgs>? Disconnected;
public event EventHandler<WebSocketMessageEventArgs>? MessageReceived;
public event EventHandler<WebSocketErrorEventArgs>? Error;
public event EventHandler<EventArgs>? Authenticated;
public event EventHandler<WebSocketPingEventArgs>? PingReceived;
public event EventHandler<WebSocketPongEventArgs>? PongReceived;
public event EventHandler<WebSocketHeartbeatEventArgs>? HeartbeatReceived;
public event EventHandler<WebSocketFeishuEventArgs>? FeishuEventReceived;
public event EventHandler<WebSocketBinaryMessageEventArgs>? BinaryMessageReceived;
public FeishuWebSocketClient(
ILogger<FeishuWebSocketClient> logger,
IFeishuEventHandlerFactory eventHandlerFactory,
ILoggerFactory loggerFactory,
FeishuWebSocketOptions? options = null)
{
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_eventHandlerFactory = eventHandlerFactory ?? throw new ArgumentNullException(nameof(eventHandlerFactory));
_options = options ?? new FeishuWebSocketOptions();
_loggerFactory = loggerFactory;
_connectionManager = new WebSocketConnectionManager(_loggerFactory.CreateLogger<WebSocketConnectionManager>(), _options);
_authManager = new AuthenticationManager(_loggerFactory.CreateLogger<AuthenticationManager>(), _options, (message) => SendMessageAsync(message));
_messageRouter = new MessageRouter(_loggerFactory.CreateLogger<MessageRouter>(), _options);
_binaryProcessor = new BinaryMessageProcessor(_loggerFactory.CreateLogger<BinaryMessageProcessor>(), _connectionManager, _options, _messageRouter);
SubscribeToComponentEvents();
RegisterMessageHandlers();
}
private void SubscribeToComponentEvents()
{
_connectionManager.Connected += (s, e) => Connected?.Invoke(this, e);
_connectionManager.Disconnected += (s, e) => Disconnected?.Invoke(this, e);
_connectionManager.Error += (s, e) => Error?.Invoke(this, e);
_authManager.Authenticated += (s, e) => Authenticated?.Invoke(this, e);
_authManager.AuthenticationFailed += (s, e) => Error?.Invoke(this, e);
_binaryProcessor.BinaryMessageReceived += (s, e) => BinaryMessageReceived?.Invoke(this, e);
_binaryProcessor.Error += (s, e) => Error?.Invoke(this, e);
}
private void RegisterMessageHandlers()
{
var pingPongHandler = new PingPongMessageHandler(
_loggerFactory.CreateLogger<PingPongMessageHandler>(),
_options,
(message) => SendMessageAsync(message));
var authHandler = new AuthMessageHandler(
_loggerFactory.CreateLogger<AuthMessageHandler>(),
(success) =>
{
if (success)
{
_authManager.HandleAuthResponse("{\"code\":0,\"msg\":\"Authentication successful\"}");
}
else
{
_authManager.HandleAuthResponse("{\"code\":-1,\"msg\":\"Authentication failed\"}");
}
});
var heartbeatHandler = new HeartbeatMessageHandler(_loggerFactory.CreateLogger<HeartbeatMessageHandler>(), _options);
_messageRouter.RegisterHandler(pingPongHandler);
_messageRouter.RegisterHandler(authHandler);
_messageRouter.RegisterHandler(heartbeatHandler);
}
public async Task ConnectAsync(WsEndpointResult endpoint, CancellationToken cancellationToken = default)
{
if (endpoint == null)
throw new ArgumentNullException(nameof(endpoint));
await _connectionManager.ConnectAsync(endpoint.Url, cancellationToken);
_cancellationTokenSource = new CancellationTokenSource();
_ = Task.Run(() => StartReceivingAsyncInternal(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
_ = Task.Run(() => StartHeartbeatAsync(_cancellationTokenSource.Token), _cancellationTokenSource.Token);
if (_options.EnableMessageQueue)
{
_messageProcessingTask = ProcessMessageQueueAsync(_cancellationTokenSource.Token);
}
}
public async Task ConnectAsync(WsEndpointResult endpoint, string appAccessToken, CancellationToken cancellationToken = default)
{
await ConnectAsync(endpoint, cancellationToken);
await _authManager.AuthenticateAsync(appAccessToken, cancellationToken);
}
public async Task DisconnectAsync(CancellationToken cancellationToken = default)
{
_cancellationTokenSource?.Cancel();
await _connectionManager.DisconnectAsync(cancellationToken);
}
public async Task SendMessageAsync(string message, CancellationToken cancellationToken = default)
{
await _connectionManager.SendMessageAsync(message, cancellationToken);
}
public void RegisterMessageProcessor(Func<string, Task> processor)
{
if (processor == null)
throw new ArgumentNullException(nameof(processor));
_messageProcessors.Add(processor);
}
}
客户端生命周期管理

组件协作时序图
核心设计模式
门面模式(Facade Pattern):
- FeishuWebSocketClient作为统一入口
- 隐藏内部组件复杂性
- 提供简洁的API接口
观察者模式(Observer Pattern):
- 事件驱动的组件通信
- 松耦合的组件协作
- 支持多订阅者监听
策略模式(Strategy Pattern):
工厂模式(Factory Pattern):
- EventHandlerFactory创建处理器
- 统一的组件初始化
- 依赖注入支持
四、应用示例
4.1 事件处理器实现
用户创建事件处理器示例
public class DemoUserEventHandler : DefaultFeishuEventHandler<UserCreatedResult>
{
private readonly DemoEventService _eventService;
private readonly INotificationService _notificationService;
public DemoUserEventHandler(
ILogger<DemoUserEventHandler> logger,
INotificationService notificationService) : base(logger)
{
_eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));
_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
}
protected override async Task ProcessBusinessLogicAsync(
EventData eventData,
UserCreatedResult? userCreated,
CancellationToken cancellationToken = default)
{
if (eventData == null)
throw new ArgumentNullException(nameof(eventData));
_logger.LogInformation("👤 [用户事件] 开始处理用户创建事件: {EventId}", eventData.EventId);
try
{
var user = ValidateAndTransformUser(userCreated?.Object);
await _eventService.RecordUserEventAsync(user, cancellationToken);
await ProcessUserEventAsync(user, cancellationToken);
_eventService.IncrementUserCount();
_logger.LogInformation("✅ [用户事件] 用户创建事件处理完成: 用户ID {UserId}, 用户名 {UserName}",
user.UserId, user.UserName);
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ [用户事件] 处理用户创建事件失败: {EventId}", eventData.EventId);
throw;
}
}
private UserData ValidateAndTransformUser(UserCreatedResult? userCreated)
{
if (userCreated == null)
throw new ArgumentException("用户创建数据不能为空");
if (string.IsNullOrWhiteSpace(userCreated.UserId))
throw new ArgumentException("用户ID不能为空");
if (string.IsNullOrWhiteSpace(userCreated.Name))
throw new ArgumentException("用户名不能为空");
return new UserData
{
UserId = userCreated.UserId,
UserName = userCreated.Name,
Email = userCreated.Email ?? string.Empty,
Department = userCreated.Department ?? string.Empty,
EmployeeType = userCreated.EmployeeType ?? string.Empty,
CreatedAt = userCreated.CreatedAt
};
}
private async Task ProcessUserEventAsync(UserData user, CancellationToken cancellationToken)
{
await Task.Delay(100, cancellationToken);
if (string.IsNullOrWhiteSpace(user.UserId))
{
throw new ArgumentException("用户ID不能为空");
}
if (_options.EnableLogging)
_logger.LogInformation("📧 [用户事件] 发送欢迎通知给用户: {UserName} ({Email})",
user.UserName, user.Email);
await _notificationService.SendWelcomeEmailAsync(user, cancellationToken);
if (_options.EnableLogging)
_logger.LogInformation("⚙️ [用户事件] 创建用户配置文件: {UserId}", user.UserId);
await _eventService.CreateUserProfileAsync(user, cancellationToken);
if (_options.EnableLogging)
_logger.LogInformation("🔐 [用户事件] 初始化用户权限: {UserId}", user.UserId);
await _eventService.InitializeUserPermissionsAsync(user, cancellationToken);
await Task.CompletedTask;
}
}
4.2 继承预定义事件处理器
public class DemoDepartmentEventHandler : DepartmentCreatedEventHandler
{
private readonly DemoEventService _eventService;
private readonly INotificationService _notificationService;
private readonly IPermissionService _permissionService;
public DemoDepartmentEventHandler(
ILogger<DemoDepartmentEventHandler> logger,
DemoEventService eventService,
INotificationService notificationService,
IPermissionService permissionService) : base(logger)
{
_eventService = eventService ?? throw new ArgumentNullException(nameof(eventService));
_notificationService = notificationService ?? throw new ArgumentNullException(nameof(notificationService));
_permissionService = permissionService ?? throw new ArgumentNullException(nameof(permissionService));
}
protected override async Task ProcessBusinessLogicAsync(
EventData eventData,
ObjectEventResult<DepartmentCreatedResult>? departmentData,
CancellationToken cancellationToken = default)
{
if (eventData == null)
throw new ArgumentNullException(nameof(eventData));
_logger.LogInformation("[部门事件] 开始处理部门创建事件: {EventId}", eventData.EventId);
try
{
var department = ValidateDepartmentData(departmentData?.Object);
await _eventService.RecordDepartmentEventAsync(department, cancellationToken);
await ProcessDepartmentEventAsync(department, cancellationToken);
await InitializeDepartmentPermissionsAsync(department, cancellationToken);
_eventService.IncrementDepartmentCount();
_logger.LogInformation("[部门事件] 部门创建事件处理完成: 部门ID {DepartmentId}, 部门名 {DepartmentName}",
department.DepartmentId, department.Name);
}
catch (Exception ex)
{
_logger.LogError(ex, "[部门事件] 处理部门创建事件失败: {EventId}", eventData.EventId);
throw;
}
}
private DepartmentData ValidateDepartmentData(DepartmentCreatedResult? departmentResult)
{
if (departmentResult == null)
throw new ArgumentException("部门创建数据不能为空");
if (string.IsNullOrWhiteSpace(departmentResult.DepartmentId))
throw new ArgumentException("部门ID不能为空");
if (string.IsNullOrWhiteSpace(departmentResult.Name))
throw new ArgumentException("部门名不能为空");
return new DepartmentData
{
DepartmentId = departmentResult.DepartmentId,
Name = departmentResult.Name,
ParentDepartmentId = departmentResult.ParentDepartmentId,
LeaderUserId = departmentResult.LeaderUserId,
DepartmentLevel = departmentResult.DepartmentLevel,
CreatedAt = DateTime.UtcNow
};
}
private async Task ProcessDepartmentEventAsync(DepartmentData department, CancellationToken cancellationToken)
{
await Task.Delay(100, cancellationToken);
if (_options.EnableLogging)
_logger.LogInformation("[部门事件] 设置部门配置: {DepartmentName}", department.Name);
await _eventService.ConfigureDepartmentSettingsAsync(department, cancellationToken);
if (!string.IsNullOrWhiteSpace(department.LeaderUserId))
{
if (_options.EnableLogging)
_logger.LogInformation("[部门事件] 通知部门主管: {LeaderUserId}", department.LeaderUserId);
await _notificationService.NotifyDepartmentLeaderAsync(department, cancellationToken);
}
if (!string.IsNullOrWhiteSpace(department.ParentDepartmentId))
{
if (_options.EnableLogging)
_logger.LogInformation("[部门事件] 建立层级关系: {DepartmentId} -> {ParentDepartmentId}",
department.DepartmentId, department.ParentDepartmentId);
await _eventService.UpdateDepartmentHierarchyAsync(department, cancellationToken);
}
await Task.CompletedTask;
}
private async Task InitializeDepartmentPermissionsAsync(DepartmentData department, CancellationToken cancellationToken)
{
if (_options.EnableLogging)
_logger.LogInformation("[部门事件] 初始化部门权限: {DepartmentName}", department.Name);
var defaultPermissions = new[]
{
"department.view",
"department.edit",
"department.member.manage"
};
foreach (var permission in defaultPermissions)
{
await _permissionService.GrantPermissionAsync(
department.DepartmentId, permission, cancellationToken);
}
if (!string.IsNullOrWhiteSpace(department.LeaderUserId))
{
await _permissionService.GrantPermissionAsync(
department.DepartmentId, "department.admin", cancellationToken);
}
}
}
4.3 依赖注入配置
建造者模式应用
public static class FeishuWebSocketServiceExtensions
{
public static IFeishuWebSocketBuilder AddFeishuWebSocketBuilder(this IServiceCollection services)
{
return new FeishuWebSocketBuilder(services);
}
}
public class FeishuWebSocketBuilder
{
private readonly IServiceCollection _services;
private readonly List<Type> _handlerTypes = new();
private FeishuWebSocketOptions _options = new();
private bool _useMultiHandler = false;
public FeishuWebSocketBuilder(IServiceCollection services)
{
_services = services ?? throw new ArgumentNullException(nameof(services));
}
public FeishuWebSocketBuilder ConfigureFrom(IConfiguration configuration)
{
configuration.GetSection("Feishu:WebSocket").Bind(_options);
return this;
}
public FeishuWebSocketBuilder ConfigureOptions(Action<FeishuWebSocketOptions> configure)
{
configure?.Invoke(_options);
return this;
}
public FeishuWebSocketBuilder UseMultiHandler()
{
_useMultiHandler = true;
return this;
}
public FeishuWebSocketBuilder AddHandler<THandler>() where THandler : class, IFeishuEventHandler
{
_handlerTypes.Add(typeof(THandler));
return this;
}
public IServiceCollection Build()
{
_services.Configure(_options);
if (_useMultiHandler)
{
_services.AddSingleton<IFeishuEventHandlerFactory, DefaultFeishuEventHandlerFactory>();
foreach (var handlerType in _handlerTypes)
{
_services.AddSingleton(typeof(IFeishuEventHandler), handlerType);
}
_services.AddSingleton<IFeishuWebSocketManager, FeishuWebSocketManager>();
}
else
{
var handlerType = _handlerTypes.FirstOrDefault();
if (handlerType != null)
{
_services.AddSingleton(typeof(IFeishuEventHandler), handlerType);
_services.AddSingleton<IFeishuWebSocketManager, FeishuWebSocketManager>();
}
}
return _services;
}
}
实际使用示例
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddFeishuWebSocketBuilder()
.ConfigureFrom(builder.Configuration)
.UseMultiHandler()
.AddHandler<ReceiveMessageEventHandler>()
.AddHandler<UserCreatedEventHandler>()
.AddHandler<DepartmentCreatedEventHandler>()
.Build();
builder.Services.AddFeishuWebSocketServiceWithSingleHandler<ReceiveMessageEventHandler>(
options => {
options.AutoReconnect = true;
options.MaxReconnectAttempts = 5;
options.HeartbeatIntervalMs = 30000;
options.EnableLogging = true;
});
builder.Services.AddFeishuWebSocketService(builder.Configuration);
4.4 Web API集成
RESTful API设计
[ApiController]
[Route("api/[controller]")]
public class WebSocketController : ControllerBase
{
private readonly IFeishuWebSocketManager _webSocketManager;
private readonly ILogger<WebSocketController> _logger;
public WebSocketController(IFeishuWebSocketManager webSocketManager,
ILogger<WebSocketController> logger)
{
_webSocketManager = webSocketManager ?? throw new ArgumentNullException(nameof(webSocketManager));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
[HttpPost("connect")]
public async Task<IActionResult> ConnectAsync()
{
try
{
await _webSocketManager.StartAsync();
return Ok(new
{
Success = true,
Message = "WebSocket连接启动成功",
Timestamp = DateTime.UtcNow
});
}
catch (Exception ex)
{
_logger.LogError(ex, "启动WebSocket连接失败");
return BadRequest(new
{
Success = false,
Message = $"WebSocket连接启动失败: {ex.Message}",
Timestamp = DateTime.UtcNow
});
}
}
[HttpPost("disconnect")]
public async Task<IActionResult> DisconnectAsync()
{
try
{
await _webSocketManager.StopAsync();
return Ok(new
{
Success = true,
Message = "WebSocket连接断开成功",
Timestamp = DateTime.UtcNow
});
}
catch (Exception ex)
{
_logger.LogError(ex, "断开WebSocket连接失败");
return BadRequest(new
{
Success = false,
Message = $"WebSocket连接断开失败: {ex.Message}",
Timestamp = DateTime.UtcNow
});
}
}
[HttpGet("status")]
public IActionResult GetStatus()
{
var stats = _webSocketManager.GetConnectionStats();
var state = _webSocketManager.GetConnectionState();
return Ok(new WebSocketStatusResponse
{
IsConnected = _webSocketManager.IsConnected,
State = state.Status.ToString(),
Uptime = stats.Uptime,
ReconnectCount = stats.ReconnectCount,
LastError = stats.LastError?.Message,
Timestamp = DateTime.UtcNow
});
}
}
实时状态监控
public class WebSocketMonitoringService : IHostedService
{
private readonly IFeishuWebSocketManager _webSocketManager;
private readonly ILogger<WebSocketMonitoringService> _logger;
private readonly Timer _monitoringTimer;
private readonly ConcurrentQueue<ConnectionSnapshot> _snapshots = new();
public WebSocketMonitoringService(IFeishuWebSocketManager webSocketManager,
ILogger<WebSocketMonitoringService> logger)
{
_webSocketManager = webSocketManager;
_logger = logger;
_monitoringTimer = new Timer(CollectStatusSnapshot, null,
TimeSpan.Zero, TimeSpan.FromSeconds(30));
}
private void CollectStatusSnapshot(object? state)
{
var stats = _webSocketManager.GetConnectionStats();
var connectionState = _webSocketManager.GetConnectionState();
var snapshot = new ConnectionSnapshot
{
Timestamp = DateTime.UtcNow,
IsConnected = stats.IsConnected,
Uptime = stats.Uptime,
ReconnectCount = stats.ReconnectCount,
LastError = stats.LastError,
ConnectionState = connectionState.Status.ToString()
};
_snapshots.Enqueue(snapshot);
while (_snapshots.Count > 100)
{
_snapshots.TryDequeue(out _);
}
AnalyzeConnectionQuality();
}
private void AnalyzeConnectionQuality()
{
var recentSnapshots = _snapshots.TakeLast(10).ToList();
if (recentSnapshots.Count < 5) return;
var connectedCount = recentSnapshots.Count(s => s.IsConnected);
var connectivityRate = (double)connectedCount / recentSnapshots.Count;
if (connectivityRate < 0.9)
{
_logger.LogWarning("连接质量较差 - 连接率: {ConnectivityRate:P2}", connectivityRate);
}
var reconnectEvents = recentSnapshots
.Where(s => s.ReconnectCount > 0)
.ToList();
if (reconnectEvents.Count > 3)
{
_logger.LogWarning("重连频率过高 - 最近10次检测中有{Count}次重连",
reconnectEvents.Count);
}
}
}
🎯 总结
使用 Mud.Feishu.Abstractions 和 Mud.Feishu.WebSocket 两个核心项目构建企业级的飞书WebSocket长连接应用。通过组件化架构设计、策略模式的事件处理、完善的错误隔离机制和全面的监控体系,开发者可以快速构建稳定、可靠的实时事件处理系统。
总的来说,Mud.Feishu.Abstractions 和 Mud.Feishu.WebSocket 两个核心项目的目标是提供一个可靠、易于集成、高度可扩展的飞书WebSocket长连接应用框架,大大降低了飞书WebSocket集成的开发复杂度,让开发者能够专注于业务逻辑的实现。
转自https://www.cnblogs.com/mudtools/p/19320597
该文章在 2025/12/9 15:58:44 编辑过