- Home »

Сокет-программирование в Python — пример сервера и клиента
Сокет-программирование — это то, о чём многие из нас слышали ещё в вузе, но потом благополучно забыли, погружаясь в мир REST API и высокоуровневых библиотек. Но знаете что? Понимание сокетов — это как знание SQL для программиста или умение работать с командной строкой для сисадмина. Базовый навык, который делает тебя более уверенным в своих действиях.
Сегодня разберём, как создать простой сервер и клиент на Python, используя встроенную библиотеку socket. Это особенно полезно для тех, кто работает с серверами, настраивает мониторинг, создаёт собственные системы логирования или просто хочет понять, как работает сетевое взаимодействие под капотом. Плюс — это отличная основа для создания собственных протоколов или простых микросервисов.
Как это работает под капотом
Сокет — это точка входа для сетевого взаимодействия. Представьте себе телефонную связь: один абонент (сервер) ждёт звонка на определённом номере (порту), а другой (клиент) набирает этот номер и устанавливает соединение. После этого они могут обмениваться данными.
В Python работа с сокетами происходит через модуль socket
, который предоставляет низкоуровневый интерфейс для работы с сетевыми соединениями. Основные типы сокетов:
- TCP (SOCK_STREAM) — надёжный, с установкой соединения
- UDP (SOCK_DGRAM) — быстрый, без установки соединения
Сегодня сосредоточимся на TCP, поскольку он гарантирует доставку данных и их порядок — именно то, что нужно для большинства серверных приложений.
Создаём простой TCP-сервер
Начнём с базового сервера, который будет слушать подключения и отвечать клиентам:
import socket
import threading
def handle_client(client_socket, address):
"""Обработка подключения клиента"""
print(f"Подключился клиент {address}")
try:
while True:
# Получаем данные от клиента
data = client_socket.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"Получено от {address}: {message}")
# Отправляем ответ
response = f"Сервер получил: {message}"
client_socket.send(response.encode('utf-8'))
except Exception as e:
print(f"Ошибка при работе с клиентом {address}: {e}")
finally:
client_socket.close()
print(f"Клиент {address} отключился")
def start_server(host='localhost', port=8888):
"""Запуск сервера"""
# Создаём сокет
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Позволяем переиспользовать адрес
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
# Привязываем сокет к адресу и порту
server_socket.bind((host, port))
# Переводим сокет в режим прослушивания
server_socket.listen(5)
print(f"Сервер запущен на {host}:{port}")
while True:
# Ждём подключения
client_socket, address = server_socket.accept()
# Создаём поток для обработки клиента
client_thread = threading.Thread(
target=handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("\nОстановка сервера...")
except Exception as e:
print(f"Ошибка сервера: {e}")
finally:
server_socket.close()
if __name__ == "__main__":
start_server()
Создаём клиент
Теперь клиент, который будет подключаться к серверу и отправлять сообщения:
import socket
import threading
import time
def receive_messages(client_socket):
"""Получение сообщений от сервера"""
try:
while True:
response = client_socket.recv(1024)
if not response:
break
print(f"Ответ сервера: {response.decode('utf-8')}")
except Exception as e:
print(f"Ошибка при получении данных: {e}")
def start_client(host='localhost', port=8888):
"""Запуск клиента"""
try:
# Создаём сокет
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Подключаемся к серверу
client_socket.connect((host, port))
print(f"Подключён к серверу {host}:{port}")
# Запускаем поток для получения сообщений
receive_thread = threading.Thread(target=receive_messages, args=(client_socket,))
receive_thread.daemon = True
receive_thread.start()
# Отправляем сообщения
while True:
message = input()
if message.lower() == 'exit':
break
client_socket.send(message.encode('utf-8'))
except Exception as e:
print(f"Ошибка клиента: {e}")
finally:
client_socket.close()
if __name__ == "__main__":
start_client()
Тестируем наше решение
Для тестирования сохраните код сервера в файл server.py
, а клиента в client.py
. Затем:
# В первом терминале запускаем сервер
python server.py
# Во втором терминале запускаем клиент
python client.py
# Можно запустить несколько клиентов одновременно
python client.py
Попробуйте отправить несколько сообщений, подключить несколько клиентов — сервер должен обрабатывать их параллельно.
Продвинутый сервер с логированием
Теперь создадим более серьёзный сервер с логированием и обработкой ошибок:
import socket
import threading
import logging
import json
import time
from datetime import datetime
# Настройка логирования
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
class SocketServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.socket = None
self.clients = {}
self.running = False
def start(self):
"""Запуск сервера"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
self.socket.bind((self.host, self.port))
self.socket.listen(10)
self.running = True
logging.info(f"Сервер запущен на {self.host}:{self.port}")
while self.running:
try:
client_socket, address = self.socket.accept()
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, address)
)
client_thread.daemon = True
client_thread.start()
except socket.error as e:
if self.running:
logging.error(f"Ошибка принятия соединения: {e}")
except Exception as e:
logging.error(f"Ошибка запуска сервера: {e}")
finally:
self.stop()
def handle_client(self, client_socket, address):
"""Обработка клиента"""
client_id = f"{address[0]}:{address[1]}"
self.clients[client_id] = {
'socket': client_socket,
'address': address,
'connected_at': datetime.now(),
'messages_count': 0
}
logging.info(f"Подключился клиент {client_id}")
try:
# Отправляем приветствие
welcome_msg = {
'type': 'welcome',
'message': f'Добро пожаловать! Вы подключены как {client_id}',
'timestamp': datetime.now().isoformat()
}
self.send_message(client_socket, welcome_msg)
while self.running:
data = client_socket.recv(1024)
if not data:
break
try:
# Пытаемся распарсить JSON
message = json.loads(data.decode('utf-8'))
self.process_message(client_socket, client_id, message)
except json.JSONDecodeError:
# Если не JSON, обрабатываем как обычный текст
text_message = data.decode('utf-8')
self.process_text_message(client_socket, client_id, text_message)
except Exception as e:
logging.error(f"Ошибка при работе с клиентом {client_id}: {e}")
finally:
self.disconnect_client(client_id)
def process_message(self, client_socket, client_id, message):
"""Обработка JSON сообщения"""
msg_type = message.get('type', 'text')
if msg_type == 'ping':
response = {
'type': 'pong',
'timestamp': datetime.now().isoformat()
}
self.send_message(client_socket, response)
elif msg_type == 'stats':
stats = {
'type': 'stats',
'connected_clients': len(self.clients),
'your_messages': self.clients[client_id]['messages_count'],
'server_uptime': str(datetime.now() - self.clients[client_id]['connected_at'])
}
self.send_message(client_socket, stats)
elif msg_type == 'broadcast':
# Отправляем сообщение всем клиентам
broadcast_msg = {
'type': 'broadcast',
'from': client_id,
'message': message.get('message', ''),
'timestamp': datetime.now().isoformat()
}
self.broadcast_message(broadcast_msg, exclude=client_id)
else:
self.process_text_message(client_socket, client_id, message.get('message', ''))
def process_text_message(self, client_socket, client_id, text):
"""Обработка текстового сообщения"""
self.clients[client_id]['messages_count'] += 1
logging.info(f"Сообщение от {client_id}: {text}")
response = {
'type': 'echo',
'message': f"Получено: {text}",
'timestamp': datetime.now().isoformat()
}
self.send_message(client_socket, response)
def send_message(self, client_socket, message):
"""Отправка сообщения клиенту"""
try:
if isinstance(message, dict):
message = json.dumps(message)
client_socket.send(message.encode('utf-8'))
except Exception as e:
logging.error(f"Ошибка отправки сообщения: {e}")
def broadcast_message(self, message, exclude=None):
"""Отправка сообщения всем клиентам"""
for client_id, client_info in self.clients.copy().items():
if client_id != exclude:
try:
self.send_message(client_info['socket'], message)
except:
self.disconnect_client(client_id)
def disconnect_client(self, client_id):
"""Отключение клиента"""
if client_id in self.clients:
try:
self.clients[client_id]['socket'].close()
except:
pass
del self.clients[client_id]
logging.info(f"Клиент {client_id} отключён")
def stop(self):
"""Остановка сервера"""
self.running = False
if self.socket:
self.socket.close()
logging.info("Сервер остановлен")
if __name__ == "__main__":
server = SocketServer()
try:
server.start()
except KeyboardInterrupt:
print("\nОстановка сервера...")
server.stop()
Производительность и сравнение подходов
Давайте сравним различные подходы к созданию сокет-серверов:
Подход | Преимущества | Недостатки | Производительность |
---|---|---|---|
Базовый сервер | Простота реализации | Только один клиент | Низкая |
Threading | Несколько клиентов | Ограничения GIL | Средняя |
Asyncio | Высокая производительность | Сложность кода | Высокая |
Multiprocessing | Истинная параллельность | Больше ресурсов | Высокая |
Asyncio версия для высокой нагрузки
Для серверов с высокой нагрузкой лучше использовать asyncio:
import asyncio
import json
import logging
from datetime import datetime
class AsyncSocketServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.clients = {}
async def handle_client(self, reader, writer):
"""Обработка клиента"""
client_address = writer.get_extra_info('peername')
client_id = f"{client_address[0]}:{client_address[1]}"
self.clients[client_id] = {
'writer': writer,
'connected_at': datetime.now(),
'messages_count': 0
}
logging.info(f"Подключился клиент {client_id}")
try:
welcome_msg = {
'type': 'welcome',
'message': f'Добро пожаловать! Вы подключены как {client_id}',
'timestamp': datetime.now().isoformat()
}
await self.send_message(writer, welcome_msg)
while True:
data = await reader.read(1024)
if not data:
break
try:
message = json.loads(data.decode('utf-8'))
await self.process_message(writer, client_id, message)
except json.JSONDecodeError:
text_message = data.decode('utf-8')
await self.process_text_message(writer, client_id, text_message)
except Exception as e:
logging.error(f"Ошибка при работе с клиентом {client_id}: {e}")
finally:
await self.disconnect_client(client_id)
async def process_message(self, writer, client_id, message):
"""Обработка JSON сообщения"""
msg_type = message.get('type', 'text')
if msg_type == 'ping':
response = {
'type': 'pong',
'timestamp': datetime.now().isoformat()
}
await self.send_message(writer, response)
elif msg_type == 'stats':
stats = {
'type': 'stats',
'connected_clients': len(self.clients),
'your_messages': self.clients[client_id]['messages_count'],
}
await self.send_message(writer, stats)
else:
await self.process_text_message(writer, client_id, message.get('message', ''))
async def process_text_message(self, writer, client_id, text):
"""Обработка текстового сообщения"""
self.clients[client_id]['messages_count'] += 1
logging.info(f"Сообщение от {client_id}: {text}")
response = {
'type': 'echo',
'message': f"Получено: {text}",
'timestamp': datetime.now().isoformat()
}
await self.send_message(writer, response)
async def send_message(self, writer, message):
"""Отправка сообщения клиенту"""
try:
if isinstance(message, dict):
message = json.dumps(message)
writer.write(message.encode('utf-8'))
await writer.drain()
except Exception as e:
logging.error(f"Ошибка отправки сообщения: {e}")
async def disconnect_client(self, client_id):
"""Отключение клиента"""
if client_id in self.clients:
try:
self.clients[client_id]['writer'].close()
await self.clients[client_id]['writer'].wait_closed()
except:
pass
del self.clients[client_id]
logging.info(f"Клиент {client_id} отключён")
async def start(self):
"""Запуск сервера"""
server = await asyncio.start_server(
self.handle_client,
self.host,
self.port
)
logging.info(f"Async сервер запущен на {self.host}:{self.port}")
async with server:
await server.serve_forever()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
server = AsyncSocketServer()
try:
asyncio.run(server.start())
except KeyboardInterrupt:
print("\nОстановка сервера...")
Практические кейсы использования
Где можно применить сокет-программирование в реальных проектах:
- Системы мониторинга — собирать метрики с удалённых серверов
- Логирование — централизованный сбор логов
- Микросервисы — быстрая коммуникация между сервисами
- Игровые сервера — real-time взаимодействие
- IoT устройства — сбор данных с датчиков
- Прокси-серверы — перенаправление трафика
Интеграция с другими технологиями
Сокеты отлично сочетаются с:
- Redis — для кэширования сессий клиентов
- PostgreSQL — для логирования сообщений
- Docker — для контейнеризации сервера
- Nginx — как reverse proxy
- Prometheus — для мониторинга метрик
Пример интеграции с Redis для хранения сессий:
import redis
import json
class RedisSocketServer(SocketServer):
def __init__(self, host='localhost', port=8888):
super().__init__(host, port)
self.redis_client = redis.Redis(host='localhost', port=6379, db=0)
def save_session(self, client_id, data):
"""Сохранение сессии в Redis"""
self.redis_client.set(f"session:{client_id}", json.dumps(data), ex=3600)
def get_session(self, client_id):
"""Получение сессии из Redis"""
data = self.redis_client.get(f"session:{client_id}")
return json.loads(data) if data else None
Безопасность и рекомендации
При работе с сокетами важно помнить о безопасности:
- Валидация данных — всегда проверяйте входящие данные
- Ограничение соединений — защита от DDoS
- SSL/TLS — шифрование для чувствительных данных
- Аутентификация — проверка подлинности клиентов
- Логирование — фиксация всех действий
Пример с SSL:
import ssl
# Создание SSL контекста
context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain('server.crt', 'server.key')
# Оборачиваем сокет в SSL
ssl_socket = context.wrap_socket(server_socket, server_side=True)
Альтернативы и похожие решения
Если сокеты кажутся слишком низкоуровневыми, рассмотрите альтернативы:
- WebSocket — для веб-приложений (библиотека websockets)
- ZeroMQ — высокоуровневая библиотека для сообщений
- gRPC — для RPC вызовов
- Apache Kafka — для больших объёмов данных
- RabbitMQ — очереди сообщений
Деплой на продакшн
Для деплоя сокет-сервера рекомендую:
- Использовать VPS с достаточным объёмом RAM
- Настроить systemd для автозапуска
- Использовать gunicorn или uwsgi для production
- Настроить мониторинг с помощью Prometheus
- Для высоконагруженных приложений — выделенный сервер
Пример systemd сервиса:
[Unit]
Description=Socket Server
After=network.target
[Service]
Type=simple
User=www-data
WorkingDirectory=/opt/socket-server
ExecStart=/usr/bin/python3 server.py
Restart=always
[Install]
WantedBy=multi-user.target
Выводы и рекомендации
Сокет-программирование в Python — это мощный инструмент для создания сетевых приложений. Начните с простых примеров, постепенно усложняя функциональность. Для production используйте asyncio, не забывайте о безопасности и мониторинге.
Основные рекомендации:
- Для обучения — используйте базовые примеры с threading
- Для production — asyncio или multiprocessing
- Всегда обрабатывайте исключения
- Логируйте все важные события
- Тестируйте под нагрузкой
Сокеты — это основа для понимания сетевого программирования. Освоив их, вы сможете создавать более эффективные и надёжные серверные приложения, лучше понимать, как работают популярные фреймворки, и решать нестандартные задачи там, где высокоуровневые решения не подходят.
В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.
Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.