WebSockets — это технология для двусторонней real-time связи между клиентом и сервером. В отличие от традиционного HTTP, WebSocket поддерживает постоянное соединение, которое позволяет серверу отправлять данные клиенту в любой момент. В этой статье мы реализуем надёжный WebSocket клиент во Flutter с нуля.
Что такое WebSockets?
WebSocket — это протокол связи, который обеспечивает постоянное двунаправленное соединение через одиночное TCP-соединение. После установки handshake-соединения обе стороны могут отправлять данные в любое время без накладных расходов HTTP.
Ключевые преимущества WebSockets
- Постоянное соединение — нет необходимости создавать новое соединение для каждого запроса
- Двусторонняя связь — сервер может инициировать отправку данных клиенту
- Низкие накладные расходы — минимизированный overhead по сравнению с HTTP polling
- Real-time обновления — мгновенная доставка сообщений
- Поддержка на всех платформах — работает в браузерах и мобильных приложениях
Когда использовать WebSockets
WebSockets идеально подходят для сценариев, где требуются real-time обновления:
- Чат-приложения и мессенджеры
- Онлайн-игры с многопользовательским режимом
- Real-time дашборды и аналитика
- Коллаборативные редакторы
- Push-уведомления без сторонних сервисов
- Live стриминг данных (котировки, графики)
Установка
Для работы с WebSockets во Flutter мы будем использовать пакет web_socket_channel — официальное решение от команды Flutter.
dependencies:
web_socket_channel: ^2.4.0
Добавьте зависимость в ваш pubspec.yaml и выполните flutter pub get.
Базовая реализация
Начнём с простой реализации WebSocket клиента. Мы создадим сервис, который управляет соединением и предоставляет Stream для получения сообщений.
import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';
class WebSocketService {
final WebSocketChannel _channel;
final StreamController<String> _messages = StreamController.broadcast();
WebSocketService(String url)
: _channel = WebSocketChannel.connect(url) {
// Подписываемся на входящие сообщения
_channel.stream.listen(
(data) {
_messages.add(data);
},
onError: (error) {
print('WebSocket error: $error');
},
onDone: () {
print('WebSocket connection closed');
},
);
}
// Stream для подписки на сообщения
Stream<String> get messages => _messages.stream;
// Отправка сообщения на сервер
void send(String message) {
_channel.sink.add(message);
}
// Закрытие соединения
void dispose() {
_channel.sink.close();
_messages.close();
}
}
В этой реализации:
— _channel управляет WebSocket соединением
— _messages — это broadcast Stream, который позволяет нескольким подписчикам получать сообщения
— Метод send отправляет данные на сервер
— Метод dispose корректно закрывает соединение и освобождает ресурсы
Обработка сообщений в UI
Теперь посмотрим, как использовать этот сервис в Flutter виджете. StreamBuilder идеально подходит для работы с WebSocket — он автоматически обновляет UI при получении новых сообщений.
class WebSocketChatWidget extends StatefulWidget {
final String wsUrl;
const WebSocketChatWidget({required this.wsUrl, super.key});
@override
State<WebSocketChatWidget> createState() => _WebSocketChatWidgetState();
}
class _WebSocketChatWidgetState extends State<WebSocketChatWidget> {
late WebSocketService _ws;
final TextEditingController _controller = TextEditingController();
final List<String> _messages = [];
@override
void initState() {
super.initState();
_ws = WebSocketService(widget.wsUrl);
}
@override
void dispose() {
_ws.dispose();
_controller.dispose();
super.dispose();
}
void _sendMessage() {
if (_controller.text.isNotEmpty) {
_ws.send(_controller.text);
_controller.clear();
}
}
@override
Widget build(BuildContext context) {
return Column(
children: [
Expanded(
child: StreamBuilder<String>(
stream: _ws.messages,
builder: (context, snapshot) {
if (snapshot.hasData) {
_messages.add(snapshot.data!);
}
if (_messages.isEmpty) {
return Center(child: Text('Нет сообщений'));
}
return ListView.builder(
itemCount: _messages.length,
itemBuilder: (context, index) {
return ListTile(
title: Text(_messages[index]),
);
},
);
},
),
),
Padding(
padding: EdgeInsets.all(8),
child: Row(
children: [
Expanded(
child: TextField(
controller: _controller,
decoration: InputDecoration(
hintText: 'Введите сообщение',
),
),
),
SizedBox(width: 8),
IconButton(
icon: Icon(Icons.send),
onPressed: _sendMessage,
),
],
),
),
],
);
}
}
Автоматическое переподключение
В реальных приложениях WebSocket соединение может разорваться из-за нестабильной сети. Нам нужна стратегия автоматического переподключения с экспоненциальным backoff.
Что такое Exponential Backoff?
Exponential backoff — это алгоритм повторных попыток, при котором интервал между попытками увеличивается экспоненциально. Например: 1с, 2с, 4с, 8с, 16с. Это предотвращает перегрузку сервера и даёт время сети восстановиться.
import 'dart:async';
import 'dart:math';
import 'package:web_socket_channel/web_socket_channel.dart';
class ReconnectingWebSocket {
final String url;
WebSocketChannel? _channel;
Timer? _reconnectTimer;
int _retryCount = 0;
// Максимальное количество попыток
static const _maxRetries = 5;
// Базовая задержка
static const _baseDelay = Duration(seconds: 1);
// Максимальная задержка
static const _maxDelay = Duration(seconds: 30);
void connect() {
_channel = WebSocketChannel.connect(url);
_channel!.stream.listen(
(data) {
// Соединение восстановлено, сбрасываем счётчик
_retryCount = 0;
print('Received: $data');
},
onError: (error) {
print('WebSocket error: $error');
_scheduleReconnect();
},
onDone: () {
print('WebSocket closed');
_scheduleReconnect();
},
);
}
void _scheduleReconnect() {
// Отменяем предыдущий таймер, если есть
_reconnectTimer?.cancel();
// Вычисляем задержку с экспоненциальным backoff
final delay = _calculateBackoffDelay();
print('Reconnecting in ${delay.inSeconds} seconds...');
_reconnectTimer = Timer(delay, () {
if (_retryCount < _maxRetries) {
_retryCount++;
print('Reconnection attempt $_retryCount');
connect();
} else {
print('Max reconnection attempts reached');
}
});
}
Duration _calculateBackoffDelay() {
// Экспоненциальный рост: 1s, 2s, 4s, 8s, 16s...
final seconds = min(
pow(2, _retryCount).toInt() * _baseDelay.inSeconds,
_maxDelay.inSeconds,
);
// Добавляем небольшую случайность (jitter)
final jitter = (seconds * 0.1).toInt();
final randomSeconds = seconds + (Random().nextInt(jitter * 2) - jitter);
return Duration(seconds: randomSeconds);
}
void send(String message) {
_channel?.sink.add(message);
}
void disconnect() {
_reconnectTimer?.cancel();
_channel?.sink.close();
}
}
Ключевые моменты этой реализации:
- Экспоненциальный backoff — задержка растёт экспоненциально (1s → 2s → 4s → 8s)
- Максимальная задержка — ограничиваем задержку до 30 секунд
- Jitter — добавляем небольшую случайность, чтобы избежать «thundering herd» проблемы
- Счётчик попыток — отслеживаем количество попыток и прекращаем после максимума
- Сброс счётчика — при успешном соединении сбрасываем счётчик повторных попыток
Heartbeat (Ping/Pong)
Долгое простаивающее соединение может быть разорвано промежуточными прокси-серверами или NAT. Чтобы предотвратить это, мы отправляем периодические ping-сообщения (heartbeat).
import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';
class HeartbeatWebSocket {
final String url;
WebSocketChannel? _channel;
Timer? _heartbeatTimer;
Timer? _timeoutTimer;
// Интервал heartbeat (30 секунд)
static const _heartbeatInterval = Duration(seconds: 30);
// Таймаут ожидания pong (10 секунд)
static const _pongTimeout = Duration(seconds: 10);
bool _waitingForPong = false;
void connect() {
_channel = WebSocketChannel.connect(url);
// Подписываемся на входящие сообщения
_channel!.stream.listen(
(data) {
if (data == 'pong') {
// Получили ответ на heartbeat
_waitingForPong = false;
_timeoutTimer?.cancel();
} else {
// Обычное сообщение
print('Received: $data');
}
},
onError: (error) {
print('WebSocket error: $error');
_stopHeartbeat();
},
onDone: () {
print('WebSocket closed');
_stopHeartbeat();
},
);
// Запускаем heartbeat
_startHeartbeat();
}
void _startHeartbeat() {
_heartbeatTimer = Timer.periodic(_heartbeatInterval, (_) {
_sendPing();
});
}
void _sendPing() {
if (_waitingForPong) {
// Не получили pong на предыдущий ping
print('Pong timeout, reconnecting...');
_stopHeartbeat();
_channel?.sink.close();
connect();
return;
}
print('Sending ping...');
_channel?.sink.add('ping');
_waitingForPong = true;
// Устанавливаем таймаут для ожидания pong
_timeoutTimer?.cancel();
_timeoutTimer = Timer(_pongTimeout, () {
if (_waitingForPong) {
print('Pong timeout');
_stopHeartbeat();
_channel?.sink.close();
connect();
}
});
}
void _stopHeartbeat() {
_heartbeatTimer?.cancel();
_timeoutTimer?.cancel();
}
void send(String message) {
_channel?.sink.add(message);
}
void dispose() {
_stopHeartbeat();
_channel?.sink.close();
}
}
Эта реализация heartbeat:
- Отправляет ping каждые 30 секунд для поддержания соединения
- Ожидает pong ответ в течение 10 секунд
- Переподключается если pong не получен
- Останавливает heartbeat при разрыве соединения
- Предотвращает дубликаты — не отправляет новый ping пока ожидается pong
Управление состоянием соединения
Для production кода важно отслеживать состояние соединения и уведомлять UI об изменениях.
enum ConnectionState {
connecting,
connected,
disconnected,
reconnecting,
error,
}
class ManagedWebSocket {
final String url;
WebSocketChannel? _channel;
final StreamController<ConnectionState> _stateController =
StreamController.broadcast();
final StreamController<String> _messageController = StreamController.broadcast();
Timer? _reconnectTimer;
int _retryCount = 0;
static const _maxRetries = 5;
// Stream для подписки на изменения состояния
Stream<ConnectionState> get stateStream => _stateController.stream;
// Stream для получения сообщений
Stream<String> get messageStream => _messageController.stream;
// Текущее состояние
ConnectionState _currentState = ConnectionState.disconnected;
ConnectionState get currentState => _currentState;
void connect() {
if (_currentState == ConnectionState.connecting ||
_currentState == ConnectionState.connected) {
return;
}
_updateState(ConnectionState.connecting);
_channel = WebSocketChannel.connect(url);
_channel!.stream.listen(
(data) {
_updateState(ConnectionState.connected);
_retryCount = 0;
_messageController.add(data);
},
onError: (error) {
_updateState(ConnectionState.error);
print('WebSocket error: $error');
_scheduleReconnect();
},
onDone: () {
_updateState(ConnectionState.disconnected);
print('WebSocket closed');
_scheduleReconnect();
},
);
}
void _updateState(ConnectionState state) {
_currentState = state;
_stateController.add(state);
}
void _scheduleReconnect() {
_reconnectTimer?.cancel();
if (_retryCount >= _maxRetries) {
_updateState(ConnectionState.error);
return;
}
_updateState(ConnectionState.reconnecting);
final delay = Duration(seconds: pow(2, _retryCount).toInt());
_reconnectTimer = Timer(delay, () {
_retryCount++;
connect();
});
}
void send(String message) {
if (_currentState == ConnectionState.connected) {
_channel?.sink.add(message);
} else {
print('Cannot send message: not connected');
}
}
void dispose() {
_reconnectTimer?.cancel();
_channel?.sink.close();
_stateController.close();
_messageController.close();
}
}
Оптимизация трафика
Для мобильных приложений важно минимизировать использование трафика. Рассмотрим несколько техник оптимизации.
Сжатие данных
Отправляйте данные в сжатом формате. Вместо JSON используйте MessagePack или Protocol Buffers.
// Вместо JSON
final jsonData = jsonEncode({'type': 'message', 'text': 'Hello'});
// Размер: ~35 байт
// Используйте компактный формат
final compactData = 'MSG|Hello'; // Ваш формат
// Размер: ~8 байт
Батчинг сообщений
Объединяйте несколько сообщений в один пакет:
class MessageBatcher {
final List<String> _batch = [];
final WebSocketService _ws;
Timer? _batchTimer;
MessageBatcher(this._ws);
void add(String message) {
_batch.add(message);
// Отправляем батч каждые 100мс или когда накоплено 10 сообщений
if (_batch.length >= 10) {
_flush();
} else {
_scheduleFlush();
}
}
void _scheduleFlush() {
_batchTimer?.cancel();
_batchTimer = Timer(Duration(milliseconds: 100), _flush);
}
void _flush() {
if (_batch.isEmpty) return;
final batch = jsonEncode({'messages': _batch});
_ws.send(batch);
_batch.clear();
}
}
Заключение
WebSockets — это мощный инструмент для создания real-time приложений на Flutter. При реализации WebSocket клиента в production обязательно учитывайте:
- Автоматическое переподключение с экспоненциальным backoff
- Heartbeat (ping/pong) для поддержания соединения
- Управление состоянием соединения и уведомление UI
- Оптимизацию трафика через сжатие и батчинг
- Правильную очистку ресурсов в методе dispose
Используйте эти практики для создания надёжных real-time приложений: чатов, онлайн-игр, collaborative tools и других сценариев, где важна мгновенная доставка сообщений.