Рубрики
Flutter

WebSockets во Flutter: Real-time с нуля до продакшена

Полное руководство по WebSockets во Flutter: реализация, reconnection strategies, backoff, оптимизация трафика.

WebSockets — это технология для двусторонней real-time связи между клиентом и сервером. В отличие от традиционного HTTP, WebSocket поддерживает постоянное соединение, которое позволяет серверу отправлять данные клиенту в любой момент. В этой статье мы реализуем надёжный WebSocket клиент во Flutter с нуля.

Что такое WebSockets?

WebSocket — это протокол связи, который обеспечивает постоянное двунаправленное соединение через одиночное TCP-соединение. После установки handshake-соединения обе стороны могут отправлять данные в любое время без накладных расходов HTTP.

Ключевые преимущества WebSockets

  • Постоянное соединение — нет необходимости создавать новое соединение для каждого запроса
  • Двусторонняя связь — сервер может инициировать отправку данных клиенту
  • Низкие накладные расходы — минимизированный overhead по сравнению с HTTP polling
  • Real-time обновления — мгновенная доставка сообщений
  • Поддержка на всех платформах — работает в браузерах и мобильных приложениях

Когда использовать WebSockets

WebSockets идеально подходят для сценариев, где требуются real-time обновления:

  • Чат-приложения и мессенджеры
  • Онлайн-игры с многопользовательским режимом
  • Real-time дашборды и аналитика
  • Коллаборативные редакторы
  • Push-уведомления без сторонних сервисов
  • Live стриминг данных (котировки, графики)

Установка

Для работы с WebSockets во Flutter мы будем использовать пакет web_socket_channel — официальное решение от команды Flutter.

dependencies:
  web_socket_channel: ^2.4.0

Добавьте зависимость в ваш pubspec.yaml и выполните flutter pub get.

Базовая реализация

Начнём с простой реализации WebSocket клиента. Мы создадим сервис, который управляет соединением и предоставляет Stream для получения сообщений.

import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';

class WebSocketService {
  final WebSocketChannel _channel;
  final StreamController<String> _messages = StreamController.broadcast();

  WebSocketService(String url)
      : _channel = WebSocketChannel.connect(url) {
    // Подписываемся на входящие сообщения
    _channel.stream.listen(
      (data) {
        _messages.add(data);
      },
      onError: (error) {
        print('WebSocket error: $error');
      },
      onDone: () {
        print('WebSocket connection closed');
      },
    );
  }

  // Stream для подписки на сообщения
  Stream<String> get messages => _messages.stream;

  // Отправка сообщения на сервер
  void send(String message) {
    _channel.sink.add(message);
  }

  // Закрытие соединения
  void dispose() {
    _channel.sink.close();
    _messages.close();
  }
}

В этой реализации: — _channel управляет WebSocket соединением — _messages — это broadcast Stream, который позволяет нескольким подписчикам получать сообщения — Метод send отправляет данные на сервер — Метод dispose корректно закрывает соединение и освобождает ресурсы

Обработка сообщений в UI

Теперь посмотрим, как использовать этот сервис в Flutter виджете. StreamBuilder идеально подходит для работы с WebSocket — он автоматически обновляет UI при получении новых сообщений.

class WebSocketChatWidget extends StatefulWidget {
  final String wsUrl;

  const WebSocketChatWidget({required this.wsUrl, super.key});

  @override
  State<WebSocketChatWidget> createState() => _WebSocketChatWidgetState();
}

class _WebSocketChatWidgetState extends State<WebSocketChatWidget> {
  late WebSocketService _ws;
  final TextEditingController _controller = TextEditingController();
  final List<String> _messages = [];

  @override
  void initState() {
    super.initState();
    _ws = WebSocketService(widget.wsUrl);
  }

  @override
  void dispose() {
    _ws.dispose();
    _controller.dispose();
    super.dispose();
  }

  void _sendMessage() {
    if (_controller.text.isNotEmpty) {
      _ws.send(_controller.text);
      _controller.clear();
    }
  }

  @override
  Widget build(BuildContext context) {
    return Column(
      children: [
        Expanded(
          child: StreamBuilder<String>(
            stream: _ws.messages,
            builder: (context, snapshot) {
              if (snapshot.hasData) {
                _messages.add(snapshot.data!);
              }

              if (_messages.isEmpty) {
                return Center(child: Text('Нет сообщений'));
              }

              return ListView.builder(
                itemCount: _messages.length,
                itemBuilder: (context, index) {
                  return ListTile(
                    title: Text(_messages[index]),
                  );
                },
              );
            },
          ),
        ),
        Padding(
          padding: EdgeInsets.all(8),
          child: Row(
            children: [
              Expanded(
                child: TextField(
                  controller: _controller,
                  decoration: InputDecoration(
                    hintText: 'Введите сообщение',
                  ),
                ),
              ),
              SizedBox(width: 8),
              IconButton(
                icon: Icon(Icons.send),
                onPressed: _sendMessage,
              ),
            ],
          ),
        ),
      ],
    );
  }
}

Автоматическое переподключение

В реальных приложениях WebSocket соединение может разорваться из-за нестабильной сети. Нам нужна стратегия автоматического переподключения с экспоненциальным backoff.

Что такое Exponential Backoff?

Exponential backoff — это алгоритм повторных попыток, при котором интервал между попытками увеличивается экспоненциально. Например: 1с, 2с, 4с, 8с, 16с. Это предотвращает перегрузку сервера и даёт время сети восстановиться.

import 'dart:async';
import 'dart:math';
import 'package:web_socket_channel/web_socket_channel.dart';

class ReconnectingWebSocket {
  final String url;
  WebSocketChannel? _channel;
  Timer? _reconnectTimer;
  int _retryCount = 0;

  // Максимальное количество попыток
  static const _maxRetries = 5;

  // Базовая задержка
  static const _baseDelay = Duration(seconds: 1);

  // Максимальная задержка
  static const _maxDelay = Duration(seconds: 30);

  void connect() {
    _channel = WebSocketChannel.connect(url);

    _channel!.stream.listen(
      (data) {
        // Соединение восстановлено, сбрасываем счётчик
        _retryCount = 0;
        print('Received: $data');
      },
      onError: (error) {
        print('WebSocket error: $error');
        _scheduleReconnect();
      },
      onDone: () {
        print('WebSocket closed');
        _scheduleReconnect();
      },
    );
  }

  void _scheduleReconnect() {
    // Отменяем предыдущий таймер, если есть
    _reconnectTimer?.cancel();

    // Вычисляем задержку с экспоненциальным backoff
    final delay = _calculateBackoffDelay();

    print('Reconnecting in ${delay.inSeconds} seconds...');

    _reconnectTimer = Timer(delay, () {
      if (_retryCount < _maxRetries) {
        _retryCount++;
        print('Reconnection attempt $_retryCount');
        connect();
      } else {
        print('Max reconnection attempts reached');
      }
    });
  }

  Duration _calculateBackoffDelay() {
    // Экспоненциальный рост: 1s, 2s, 4s, 8s, 16s...
    final seconds = min(
      pow(2, _retryCount).toInt() * _baseDelay.inSeconds,
      _maxDelay.inSeconds,
    );

    // Добавляем небольшую случайность (jitter)
    final jitter = (seconds * 0.1).toInt();
    final randomSeconds = seconds + (Random().nextInt(jitter * 2) - jitter);

    return Duration(seconds: randomSeconds);
  }

  void send(String message) {
    _channel?.sink.add(message);
  }

  void disconnect() {
    _reconnectTimer?.cancel();
    _channel?.sink.close();
  }
}

Ключевые моменты этой реализации:

  1. Экспоненциальный backoff — задержка растёт экспоненциально (1s → 2s → 4s → 8s)
  2. Максимальная задержка — ограничиваем задержку до 30 секунд
  3. Jitter — добавляем небольшую случайность, чтобы избежать «thundering herd» проблемы
  4. Счётчик попыток — отслеживаем количество попыток и прекращаем после максимума
  5. Сброс счётчика — при успешном соединении сбрасываем счётчик повторных попыток

Heartbeat (Ping/Pong)

Долгое простаивающее соединение может быть разорвано промежуточными прокси-серверами или NAT. Чтобы предотвратить это, мы отправляем периодические ping-сообщения (heartbeat).

import 'dart:async';
import 'package:web_socket_channel/web_socket_channel.dart';

class HeartbeatWebSocket {
  final String url;
  WebSocketChannel? _channel;
  Timer? _heartbeatTimer;
  Timer? _timeoutTimer;

  // Интервал heartbeat (30 секунд)
  static const _heartbeatInterval = Duration(seconds: 30);

  // Таймаут ожидания pong (10 секунд)
  static const _pongTimeout = Duration(seconds: 10);

  bool _waitingForPong = false;

  void connect() {
    _channel = WebSocketChannel.connect(url);

    // Подписываемся на входящие сообщения
    _channel!.stream.listen(
      (data) {
        if (data == 'pong') {
          // Получили ответ на heartbeat
          _waitingForPong = false;
          _timeoutTimer?.cancel();
        } else {
          // Обычное сообщение
          print('Received: $data');
        }
      },
      onError: (error) {
        print('WebSocket error: $error');
        _stopHeartbeat();
      },
      onDone: () {
        print('WebSocket closed');
        _stopHeartbeat();
      },
    );

    // Запускаем heartbeat
    _startHeartbeat();
  }

  void _startHeartbeat() {
    _heartbeatTimer = Timer.periodic(_heartbeatInterval, (_) {
      _sendPing();
    });
  }

  void _sendPing() {
    if (_waitingForPong) {
      // Не получили pong на предыдущий ping
      print('Pong timeout, reconnecting...');
      _stopHeartbeat();
      _channel?.sink.close();
      connect();
      return;
    }

    print('Sending ping...');
    _channel?.sink.add('ping');
    _waitingForPong = true;

    // Устанавливаем таймаут для ожидания pong
    _timeoutTimer?.cancel();
    _timeoutTimer = Timer(_pongTimeout, () {
      if (_waitingForPong) {
        print('Pong timeout');
        _stopHeartbeat();
        _channel?.sink.close();
        connect();
      }
    });
  }

  void _stopHeartbeat() {
    _heartbeatTimer?.cancel();
    _timeoutTimer?.cancel();
  }

  void send(String message) {
    _channel?.sink.add(message);
  }

  void dispose() {
    _stopHeartbeat();
    _channel?.sink.close();
  }
}

Эта реализация heartbeat:

  1. Отправляет ping каждые 30 секунд для поддержания соединения
  2. Ожидает pong ответ в течение 10 секунд
  3. Переподключается если pong не получен
  4. Останавливает heartbeat при разрыве соединения
  5. Предотвращает дубликаты — не отправляет новый ping пока ожидается pong

Управление состоянием соединения

Для production кода важно отслеживать состояние соединения и уведомлять UI об изменениях.

enum ConnectionState {
  connecting,
  connected,
  disconnected,
  reconnecting,
  error,
}

class ManagedWebSocket {
  final String url;

  WebSocketChannel? _channel;
  final StreamController<ConnectionState> _stateController =
      StreamController.broadcast();
  final StreamController<String> _messageController = StreamController.broadcast();

  Timer? _reconnectTimer;
  int _retryCount = 0;
  static const _maxRetries = 5;

  // Stream для подписки на изменения состояния
  Stream<ConnectionState> get stateStream => _stateController.stream;

  // Stream для получения сообщений
  Stream<String> get messageStream => _messageController.stream;

  // Текущее состояние
  ConnectionState _currentState = ConnectionState.disconnected;

  ConnectionState get currentState => _currentState;

  void connect() {
    if (_currentState == ConnectionState.connecting ||
        _currentState == ConnectionState.connected) {
      return;
    }

    _updateState(ConnectionState.connecting);

    _channel = WebSocketChannel.connect(url);

    _channel!.stream.listen(
      (data) {
        _updateState(ConnectionState.connected);
        _retryCount = 0;
        _messageController.add(data);
      },
      onError: (error) {
        _updateState(ConnectionState.error);
        print('WebSocket error: $error');
        _scheduleReconnect();
      },
      onDone: () {
        _updateState(ConnectionState.disconnected);
        print('WebSocket closed');
        _scheduleReconnect();
      },
    );
  }

  void _updateState(ConnectionState state) {
    _currentState = state;
    _stateController.add(state);
  }

  void _scheduleReconnect() {
    _reconnectTimer?.cancel();

    if (_retryCount >= _maxRetries) {
      _updateState(ConnectionState.error);
      return;
    }

    _updateState(ConnectionState.reconnecting);

    final delay = Duration(seconds: pow(2, _retryCount).toInt());

    _reconnectTimer = Timer(delay, () {
      _retryCount++;
      connect();
    });
  }

  void send(String message) {
    if (_currentState == ConnectionState.connected) {
      _channel?.sink.add(message);
    } else {
      print('Cannot send message: not connected');
    }
  }

  void dispose() {
    _reconnectTimer?.cancel();
    _channel?.sink.close();
    _stateController.close();
    _messageController.close();
  }
}

Оптимизация трафика

Для мобильных приложений важно минимизировать использование трафика. Рассмотрим несколько техник оптимизации.

Сжатие данных

Отправляйте данные в сжатом формате. Вместо JSON используйте MessagePack или Protocol Buffers.

// Вместо JSON
final jsonData = jsonEncode({'type': 'message', 'text': 'Hello'});
// Размер: ~35 байт

// Используйте компактный формат
final compactData = 'MSG|Hello'; // Ваш формат
// Размер: ~8 байт

Батчинг сообщений

Объединяйте несколько сообщений в один пакет:

class MessageBatcher {
  final List<String> _batch = [];
  final WebSocketService _ws;
  Timer? _batchTimer;

  MessageBatcher(this._ws);

  void add(String message) {
    _batch.add(message);

    // Отправляем батч каждые 100мс или когда накоплено 10 сообщений
    if (_batch.length >= 10) {
      _flush();
    } else {
      _scheduleFlush();
    }
  }

  void _scheduleFlush() {
    _batchTimer?.cancel();
    _batchTimer = Timer(Duration(milliseconds: 100), _flush);
  }

  void _flush() {
    if (_batch.isEmpty) return;

    final batch = jsonEncode({'messages': _batch});
    _ws.send(batch);

    _batch.clear();
  }
}

Заключение

WebSockets — это мощный инструмент для создания real-time приложений на Flutter. При реализации WebSocket клиента в production обязательно учитывайте:

  • Автоматическое переподключение с экспоненциальным backoff
  • Heartbeat (ping/pong) для поддержания соединения
  • Управление состоянием соединения и уведомление UI
  • Оптимизацию трафика через сжатие и батчинг
  • Правильную очистку ресурсов в методе dispose

Используйте эти практики для создания надёжных real-time приложений: чатов, онлайн-игр, collaborative tools и других сценариев, где важна мгновенная доставка сообщений.