Home » Сокет-программирование в Python — пример сервера и клиента
Сокет-программирование в Python — пример сервера и клиента

Сокет-программирование в Python — пример сервера и клиента

Сокет-программирование — это то, о чём многие из нас слышали ещё в вузе, но потом благополучно забыли, погружаясь в мир REST API и высокоуровневых библиотек. Но знаете что? Понимание сокетов — это как знание SQL для программиста или умение работать с командной строкой для сисадмина. Базовый навык, который делает тебя более уверенным в своих действиях.

Сегодня разберём, как создать простой сервер и клиент на Python, используя встроенную библиотеку socket. Это особенно полезно для тех, кто работает с серверами, настраивает мониторинг, создаёт собственные системы логирования или просто хочет понять, как работает сетевое взаимодействие под капотом. Плюс — это отличная основа для создания собственных протоколов или простых микросервисов.

Как это работает под капотом

Сокет — это точка входа для сетевого взаимодействия. Представьте себе телефонную связь: один абонент (сервер) ждёт звонка на определённом номере (порту), а другой (клиент) набирает этот номер и устанавливает соединение. После этого они могут обмениваться данными.

В Python работа с сокетами происходит через модуль socket, который предоставляет низкоуровневый интерфейс для работы с сетевыми соединениями. Основные типы сокетов:

  • TCP (SOCK_STREAM) — надёжный, с установкой соединения
  • UDP (SOCK_DGRAM) — быстрый, без установки соединения

Сегодня сосредоточимся на TCP, поскольку он гарантирует доставку данных и их порядок — именно то, что нужно для большинства серверных приложений.

Создаём простой TCP-сервер

Начнём с базового сервера, который будет слушать подключения и отвечать клиентам:

import socket
import threading

def handle_client(client_socket, address):
    """Обработка подключения клиента"""
    print(f"Подключился клиент {address}")
    
    try:
        while True:
            # Получаем данные от клиента
            data = client_socket.recv(1024)
            if not data:
                break
            
            message = data.decode('utf-8')
            print(f"Получено от {address}: {message}")
            
            # Отправляем ответ
            response = f"Сервер получил: {message}"
            client_socket.send(response.encode('utf-8'))
            
    except Exception as e:
        print(f"Ошибка при работе с клиентом {address}: {e}")
    finally:
        client_socket.close()
        print(f"Клиент {address} отключился")

def start_server(host='localhost', port=8888):
    """Запуск сервера"""
    # Создаём сокет
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # Позволяем переиспользовать адрес
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    try:
        # Привязываем сокет к адресу и порту
        server_socket.bind((host, port))
        
        # Переводим сокет в режим прослушивания
        server_socket.listen(5)
        print(f"Сервер запущен на {host}:{port}")
        
        while True:
            # Ждём подключения
            client_socket, address = server_socket.accept()
            
            # Создаём поток для обработки клиента
            client_thread = threading.Thread(
                target=handle_client,
                args=(client_socket, address)
            )
            client_thread.daemon = True
            client_thread.start()
            
    except KeyboardInterrupt:
        print("\nОстановка сервера...")
    except Exception as e:
        print(f"Ошибка сервера: {e}")
    finally:
        server_socket.close()

if __name__ == "__main__":
    start_server()

Создаём клиент

Теперь клиент, который будет подключаться к серверу и отправлять сообщения:

import socket
import threading
import time

def receive_messages(client_socket):
    """Получение сообщений от сервера"""
    try:
        while True:
            response = client_socket.recv(1024)
            if not response:
                break
            print(f"Ответ сервера: {response.decode('utf-8')}")
    except Exception as e:
        print(f"Ошибка при получении данных: {e}")

def start_client(host='localhost', port=8888):
    """Запуск клиента"""
    try:
        # Создаём сокет
        client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        
        # Подключаемся к серверу
        client_socket.connect((host, port))
        print(f"Подключён к серверу {host}:{port}")
        
        # Запускаем поток для получения сообщений
        receive_thread = threading.Thread(target=receive_messages, args=(client_socket,))
        receive_thread.daemon = True
        receive_thread.start()
        
        # Отправляем сообщения
        while True:
            message = input()
            if message.lower() == 'exit':
                break
            client_socket.send(message.encode('utf-8'))
            
    except Exception as e:
        print(f"Ошибка клиента: {e}")
    finally:
        client_socket.close()

if __name__ == "__main__":
    start_client()

Тестируем наше решение

Для тестирования сохраните код сервера в файл server.py, а клиента в client.py. Затем:

# В первом терминале запускаем сервер
python server.py

# Во втором терминале запускаем клиент
python client.py

# Можно запустить несколько клиентов одновременно
python client.py

Попробуйте отправить несколько сообщений, подключить несколько клиентов — сервер должен обрабатывать их параллельно.

Продвинутый сервер с логированием

Теперь создадим более серьёзный сервер с логированием и обработкой ошибок:

import socket
import threading
import logging
import json
import time
from datetime import datetime

# Настройка логирования
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

class SocketServer:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.socket = None
        self.clients = {}
        self.running = False
        
    def start(self):
        """Запуск сервера"""
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        try:
            self.socket.bind((self.host, self.port))
            self.socket.listen(10)
            self.running = True
            
            logging.info(f"Сервер запущен на {self.host}:{self.port}")
            
            while self.running:
                try:
                    client_socket, address = self.socket.accept()
                    client_thread = threading.Thread(
                        target=self.handle_client,
                        args=(client_socket, address)
                    )
                    client_thread.daemon = True
                    client_thread.start()
                except socket.error as e:
                    if self.running:
                        logging.error(f"Ошибка принятия соединения: {e}")
                        
        except Exception as e:
            logging.error(f"Ошибка запуска сервера: {e}")
        finally:
            self.stop()
            
    def handle_client(self, client_socket, address):
        """Обработка клиента"""
        client_id = f"{address[0]}:{address[1]}"
        self.clients[client_id] = {
            'socket': client_socket,
            'address': address,
            'connected_at': datetime.now(),
            'messages_count': 0
        }
        
        logging.info(f"Подключился клиент {client_id}")
        
        try:
            # Отправляем приветствие
            welcome_msg = {
                'type': 'welcome',
                'message': f'Добро пожаловать! Вы подключены как {client_id}',
                'timestamp': datetime.now().isoformat()
            }
            self.send_message(client_socket, welcome_msg)
            
            while self.running:
                data = client_socket.recv(1024)
                if not data:
                    break
                    
                try:
                    # Пытаемся распарсить JSON
                    message = json.loads(data.decode('utf-8'))
                    self.process_message(client_socket, client_id, message)
                except json.JSONDecodeError:
                    # Если не JSON, обрабатываем как обычный текст
                    text_message = data.decode('utf-8')
                    self.process_text_message(client_socket, client_id, text_message)
                    
        except Exception as e:
            logging.error(f"Ошибка при работе с клиентом {client_id}: {e}")
        finally:
            self.disconnect_client(client_id)
            
    def process_message(self, client_socket, client_id, message):
        """Обработка JSON сообщения"""
        msg_type = message.get('type', 'text')
        
        if msg_type == 'ping':
            response = {
                'type': 'pong',
                'timestamp': datetime.now().isoformat()
            }
            self.send_message(client_socket, response)
            
        elif msg_type == 'stats':
            stats = {
                'type': 'stats',
                'connected_clients': len(self.clients),
                'your_messages': self.clients[client_id]['messages_count'],
                'server_uptime': str(datetime.now() - self.clients[client_id]['connected_at'])
            }
            self.send_message(client_socket, stats)
            
        elif msg_type == 'broadcast':
            # Отправляем сообщение всем клиентам
            broadcast_msg = {
                'type': 'broadcast',
                'from': client_id,
                'message': message.get('message', ''),
                'timestamp': datetime.now().isoformat()
            }
            self.broadcast_message(broadcast_msg, exclude=client_id)
            
        else:
            self.process_text_message(client_socket, client_id, message.get('message', ''))
            
    def process_text_message(self, client_socket, client_id, text):
        """Обработка текстового сообщения"""
        self.clients[client_id]['messages_count'] += 1
        
        logging.info(f"Сообщение от {client_id}: {text}")
        
        response = {
            'type': 'echo',
            'message': f"Получено: {text}",
            'timestamp': datetime.now().isoformat()
        }
        
        self.send_message(client_socket, response)
        
    def send_message(self, client_socket, message):
        """Отправка сообщения клиенту"""
        try:
            if isinstance(message, dict):
                message = json.dumps(message)
            client_socket.send(message.encode('utf-8'))
        except Exception as e:
            logging.error(f"Ошибка отправки сообщения: {e}")
            
    def broadcast_message(self, message, exclude=None):
        """Отправка сообщения всем клиентам"""
        for client_id, client_info in self.clients.copy().items():
            if client_id != exclude:
                try:
                    self.send_message(client_info['socket'], message)
                except:
                    self.disconnect_client(client_id)
                    
    def disconnect_client(self, client_id):
        """Отключение клиента"""
        if client_id in self.clients:
            try:
                self.clients[client_id]['socket'].close()
            except:
                pass
            del self.clients[client_id]
            logging.info(f"Клиент {client_id} отключён")
            
    def stop(self):
        """Остановка сервера"""
        self.running = False
        if self.socket:
            self.socket.close()
        logging.info("Сервер остановлен")

if __name__ == "__main__":
    server = SocketServer()
    try:
        server.start()
    except KeyboardInterrupt:
        print("\nОстановка сервера...")
        server.stop()

Производительность и сравнение подходов

Давайте сравним различные подходы к созданию сокет-серверов:

Подход Преимущества Недостатки Производительность
Базовый сервер Простота реализации Только один клиент Низкая
Threading Несколько клиентов Ограничения GIL Средняя
Asyncio Высокая производительность Сложность кода Высокая
Multiprocessing Истинная параллельность Больше ресурсов Высокая

Asyncio версия для высокой нагрузки

Для серверов с высокой нагрузкой лучше использовать asyncio:

import asyncio
import json
import logging
from datetime import datetime

class AsyncSocketServer:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.clients = {}
        
    async def handle_client(self, reader, writer):
        """Обработка клиента"""
        client_address = writer.get_extra_info('peername')
        client_id = f"{client_address[0]}:{client_address[1]}"
        
        self.clients[client_id] = {
            'writer': writer,
            'connected_at': datetime.now(),
            'messages_count': 0
        }
        
        logging.info(f"Подключился клиент {client_id}")
        
        try:
            welcome_msg = {
                'type': 'welcome',
                'message': f'Добро пожаловать! Вы подключены как {client_id}',
                'timestamp': datetime.now().isoformat()
            }
            await self.send_message(writer, welcome_msg)
            
            while True:
                data = await reader.read(1024)
                if not data:
                    break
                    
                try:
                    message = json.loads(data.decode('utf-8'))
                    await self.process_message(writer, client_id, message)
                except json.JSONDecodeError:
                    text_message = data.decode('utf-8')
                    await self.process_text_message(writer, client_id, text_message)
                    
        except Exception as e:
            logging.error(f"Ошибка при работе с клиентом {client_id}: {e}")
        finally:
            await self.disconnect_client(client_id)
            
    async def process_message(self, writer, client_id, message):
        """Обработка JSON сообщения"""
        msg_type = message.get('type', 'text')
        
        if msg_type == 'ping':
            response = {
                'type': 'pong',
                'timestamp': datetime.now().isoformat()
            }
            await self.send_message(writer, response)
            
        elif msg_type == 'stats':
            stats = {
                'type': 'stats',
                'connected_clients': len(self.clients),
                'your_messages': self.clients[client_id]['messages_count'],
            }
            await self.send_message(writer, stats)
            
        else:
            await self.process_text_message(writer, client_id, message.get('message', ''))
            
    async def process_text_message(self, writer, client_id, text):
        """Обработка текстового сообщения"""
        self.clients[client_id]['messages_count'] += 1
        
        logging.info(f"Сообщение от {client_id}: {text}")
        
        response = {
            'type': 'echo',
            'message': f"Получено: {text}",
            'timestamp': datetime.now().isoformat()
        }
        
        await self.send_message(writer, response)
        
    async def send_message(self, writer, message):
        """Отправка сообщения клиенту"""
        try:
            if isinstance(message, dict):
                message = json.dumps(message)
            writer.write(message.encode('utf-8'))
            await writer.drain()
        except Exception as e:
            logging.error(f"Ошибка отправки сообщения: {e}")
            
    async def disconnect_client(self, client_id):
        """Отключение клиента"""
        if client_id in self.clients:
            try:
                self.clients[client_id]['writer'].close()
                await self.clients[client_id]['writer'].wait_closed()
            except:
                pass
            del self.clients[client_id]
            logging.info(f"Клиент {client_id} отключён")
            
    async def start(self):
        """Запуск сервера"""
        server = await asyncio.start_server(
            self.handle_client,
            self.host,
            self.port
        )
        
        logging.info(f"Async сервер запущен на {self.host}:{self.port}")
        
        async with server:
            await server.serve_forever()

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    server = AsyncSocketServer()
    try:
        asyncio.run(server.start())
    except KeyboardInterrupt:
        print("\nОстановка сервера...")

Практические кейсы использования

Где можно применить сокет-программирование в реальных проектах:

  • Системы мониторинга — собирать метрики с удалённых серверов
  • Логирование — централизованный сбор логов
  • Микросервисы — быстрая коммуникация между сервисами
  • Игровые сервера — real-time взаимодействие
  • IoT устройства — сбор данных с датчиков
  • Прокси-серверы — перенаправление трафика

Интеграция с другими технологиями

Сокеты отлично сочетаются с:

  • Redis — для кэширования сессий клиентов
  • PostgreSQL — для логирования сообщений
  • Docker — для контейнеризации сервера
  • Nginx — как reverse proxy
  • Prometheus — для мониторинга метрик

Пример интеграции с Redis для хранения сессий:

import redis
import json

class RedisSocketServer(SocketServer):
    def __init__(self, host='localhost', port=8888):
        super().__init__(host, port)
        self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
        
    def save_session(self, client_id, data):
        """Сохранение сессии в Redis"""
        self.redis_client.set(f"session:{client_id}", json.dumps(data), ex=3600)
        
    def get_session(self, client_id):
        """Получение сессии из Redis"""
        data = self.redis_client.get(f"session:{client_id}")
        return json.loads(data) if data else None

Безопасность и рекомендации

При работе с сокетами важно помнить о безопасности:

  • Валидация данных — всегда проверяйте входящие данные
  • Ограничение соединений — защита от DDoS
  • SSL/TLS — шифрование для чувствительных данных
  • Аутентификация — проверка подлинности клиентов
  • Логирование — фиксация всех действий

Пример с SSL:

import ssl

# Создание SSL контекста
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain('server.crt', 'server.key')

# Оборачиваем сокет в SSL
ssl_socket = context.wrap_socket(server_socket, server_side=True)

Альтернативы и похожие решения

Если сокеты кажутся слишком низкоуровневыми, рассмотрите альтернативы:

  • WebSocket — для веб-приложений (библиотека websockets)
  • ZeroMQ — высокоуровневая библиотека для сообщений
  • gRPC — для RPC вызовов
  • Apache Kafka — для больших объёмов данных
  • RabbitMQ — очереди сообщений

Деплой на продакшн

Для деплоя сокет-сервера рекомендую:

  • Использовать VPS с достаточным объёмом RAM
  • Настроить systemd для автозапуска
  • Использовать gunicorn или uwsgi для production
  • Настроить мониторинг с помощью Prometheus
  • Для высоконагруженных приложений — выделенный сервер

Пример systemd сервиса:

[Unit]
Description=Socket Server
After=network.target

[Service]
Type=simple
User=www-data
WorkingDirectory=/opt/socket-server
ExecStart=/usr/bin/python3 server.py
Restart=always

[Install]
WantedBy=multi-user.target

Выводы и рекомендации

Сокет-программирование в Python — это мощный инструмент для создания сетевых приложений. Начните с простых примеров, постепенно усложняя функциональность. Для production используйте asyncio, не забывайте о безопасности и мониторинге.

Основные рекомендации:

  • Для обучения — используйте базовые примеры с threading
  • Для production — asyncio или multiprocessing
  • Всегда обрабатывайте исключения
  • Логируйте все важные события
  • Тестируйте под нагрузкой

Сокеты — это основа для понимания сетевого программирования. Освоив их, вы сможете создавать более эффективные и надёжные серверные приложения, лучше понимать, как работают популярные фреймворки, и решать нестандартные задачи там, где высокоуровневые решения не подходят.


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked