Home » Очередь в C — реализация и использование
Очередь в C — реализация и использование

Очередь в C — реализация и использование

Если ты когда-нибудь работал с многопоточными приложениями или системами реального времени, то знаешь, что очереди — это не просто абстрактная структура данных из учебника. Это практический инструмент, который может превратить хаотичную обработку данных в стройную систему. Особенно это актуально для серверных приложений, где нужно обрабатывать тысячи запросов в секунду без потери данных и с минимальными задержками. Сегодня разберём, как правильно реализовать очередь в C, покажем реальные примеры и поделимся лайфхаками из production-окружения.

Как работает очередь в C — механизм под капотом

Очередь следует принципу FIFO (First In, First Out) — первый пришёл, первый ушёл. В отличие от стека, где элементы добавляются и удаляются с одного конца, в очереди добавление происходит в хвост (rear), а удаление — из головы (front). Это делает очередь идеальным инструментом для буферизации данных в серверных приложениях.

Базовая реализация может использовать:

  • Массив с циклическим буфером — быстро, но ограниченный размер
  • Связный список — динамический размер, но больше накладных расходов
  • Гибридные решения — комбинация массива и списка для оптимизации

Для серверных задач чаще всего используется циклический буфер, так как он даёт предсказуемую производительность и минимальную фрагментацию памяти.

Реализация очереди на массиве — быстро и эффективно

Начнём с классической реализации на базе массива. Эта версия отлично подойдёт для задач, где размер очереди известен заранее:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>

#define MAX_SIZE 1000

typedef struct {
    int data[MAX_SIZE];
    int front;
    int rear;
    int size;
} Queue;

Queue* createQueue() {
    Queue* q = (Queue*)malloc(sizeof(Queue));
    q->front = 0;
    q->rear = -1;
    q->size = 0;
    return q;
}

bool isEmpty(Queue* q) {
    return q->size == 0;
}

bool isFull(Queue* q) {
    return q->size == MAX_SIZE;
}

bool enqueue(Queue* q, int value) {
    if (isFull(q)) {
        return false;
    }
    q->rear = (q->rear + 1) % MAX_SIZE;
    q->data[q->rear] = value;
    q->size++;
    return true;
}

int dequeue(Queue* q) {
    if (isEmpty(q)) {
        return -1; // или другое значение для обозначения ошибки
    }
    int value = q->data[q->front];
    q->front = (q->front + 1) % MAX_SIZE;
    q->size--;
    return value;
}

int peek(Queue* q) {
    if (isEmpty(q)) {
        return -1;
    }
    return q->data[q->front];
}

void destroyQueue(Queue* q) {
    free(q);
}

Ключевая фишка здесь — использование модульной арифметики для циклического буфера. Когда rear достигает конца массива, он обнуляется и начинает заново, что позволяет эффективно использовать память.

Динамическая очередь на связном списке

Для задач, где размер очереди может сильно варьироваться, лучше использовать связный список. Вот thread-safe версия, которая пригодится в многопоточных приложениях:

#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <pthread.h>

typedef struct Node {
    int data;
    struct Node* next;
} Node;

typedef struct {
    Node* front;
    Node* rear;
    int size;
    pthread_mutex_t mutex;
} ThreadSafeQueue;

ThreadSafeQueue* createThreadSafeQueue() {
    ThreadSafeQueue* q = (ThreadSafeQueue*)malloc(sizeof(ThreadSafeQueue));
    q->front = NULL;
    q->rear = NULL;
    q->size = 0;
    pthread_mutex_init(&q->mutex, NULL);
    return q;
}

bool tsEnqueue(ThreadSafeQueue* q, int value) {
    Node* newNode = (Node*)malloc(sizeof(Node));
    if (!newNode) return false;
    
    newNode->data = value;
    newNode->next = NULL;
    
    pthread_mutex_lock(&q->mutex);
    
    if (q->rear == NULL) {
        q->front = q->rear = newNode;
    } else {
        q->rear->next = newNode;
        q->rear = newNode;
    }
    q->size++;
    
    pthread_mutex_unlock(&q->mutex);
    return true;
}

int tsDequeue(ThreadSafeQueue* q) {
    pthread_mutex_lock(&q->mutex);
    
    if (q->front == NULL) {
        pthread_mutex_unlock(&q->mutex);
        return -1;
    }
    
    Node* temp = q->front;
    int value = temp->data;
    q->front = q->front->next;
    
    if (q->front == NULL) {
        q->rear = NULL;
    }
    
    q->size--;
    free(temp);
    
    pthread_mutex_unlock(&q->mutex);
    return value;
}

void destroyThreadSafeQueue(ThreadSafeQueue* q) {
    while (q->front) {
        Node* temp = q->front;
        q->front = q->front->next;
        free(temp);
    }
    pthread_mutex_destroy(&q->mutex);
    free(q);
}

Практические примеры и кейсы использования

Давайте посмотрим на реальные сценарии, где очереди решают конкретные задачи:

Буферизация логов веб-сервера

#include <time.h>
#include <string.h>

#define LOG_BUFFER_SIZE 10000
#define LOG_ENTRY_SIZE 256

typedef struct {
    char entries[LOG_BUFFER_SIZE][LOG_ENTRY_SIZE];
    int front;
    int rear;
    int count;
    pthread_mutex_t mutex;
} LogBuffer;

LogBuffer* createLogBuffer() {
    LogBuffer* lb = (LogBuffer*)malloc(sizeof(LogBuffer));
    lb->front = 0;
    lb->rear = 0;
    lb->count = 0;
    pthread_mutex_init(&lb->mutex, NULL);
    return lb;
}

bool addLogEntry(LogBuffer* lb, const char* message) {
    pthread_mutex_lock(&lb->mutex);
    
    time_t now = time(NULL);
    char* timeStr = ctime(&now);
    timeStr[strlen(timeStr) - 1] = '\0'; // убираем \n
    
    snprintf(lb->entries[lb->rear], LOG_ENTRY_SIZE, 
             "[%s] %s", timeStr, message);
    
    lb->rear = (lb->rear + 1) % LOG_BUFFER_SIZE;
    
    if (lb->count < LOG_BUFFER_SIZE) {
        lb->count++;
    } else {
        lb->front = (lb->front + 1) % LOG_BUFFER_SIZE;
    }
    
    pthread_mutex_unlock(&lb->mutex);
    return true;
}

void flushLogsToFile(LogBuffer* lb, const char* filename) {
    FILE* file = fopen(filename, "a");
    if (!file) return;
    
    pthread_mutex_lock(&lb->mutex);
    
    int current = lb->front;
    for (int i = 0; i < lb->count; i++) {
        fprintf(file, "%s\n", lb->entries[current]);
        current = (current + 1) % LOG_BUFFER_SIZE;
    }
    
    lb->front = lb->rear = lb->count = 0;
    
    pthread_mutex_unlock(&lb->mutex);
    fclose(file);
}

Система обработки HTTP-запросов

#include <sys/socket.h>
#include <netinet/in.h>

typedef struct {
    int socket_fd;
    struct sockaddr_in client_addr;
    time_t received_time;
} HttpRequest;

typedef struct {
    HttpRequest requests[1000];
    int front;
    int rear;
    int count;
    pthread_mutex_t mutex;
    pthread_cond_t condition;
} RequestQueue;

RequestQueue* createRequestQueue() {
    RequestQueue* rq = (RequestQueue*)malloc(sizeof(RequestQueue));
    rq->front = 0;
    rq->rear = 0;
    rq->count = 0;
    pthread_mutex_init(&rq->mutex, NULL);
    pthread_cond_init(&rq->condition, NULL);
    return rq;
}

bool enqueueRequest(RequestQueue* rq, HttpRequest req) {
    pthread_mutex_lock(&rq->mutex);
    
    if (rq->count >= 1000) {
        pthread_mutex_unlock(&rq->mutex);
        return false; // очередь полна
    }
    
    rq->requests[rq->rear] = req;
    rq->rear = (rq->rear + 1) % 1000;
    rq->count++;
    
    pthread_cond_signal(&rq->condition);
    pthread_mutex_unlock(&rq->mutex);
    return true;
}

HttpRequest dequeueRequest(RequestQueue* rq) {
    pthread_mutex_lock(&rq->mutex);
    
    while (rq->count == 0) {
        pthread_cond_wait(&rq->condition, &rq->mutex);
    }
    
    HttpRequest req = rq->requests[rq->front];
    rq->front = (rq->front + 1) % 1000;
    rq->count--;
    
    pthread_mutex_unlock(&rq->mutex);
    return req;
}

void* workerThread(void* arg) {
    RequestQueue* rq = (RequestQueue*)arg;
    
    while (1) {
        HttpRequest req = dequeueRequest(rq);
        
        // Обработка запроса
        char buffer[1024];
        ssize_t bytes_read = recv(req.socket_fd, buffer, sizeof(buffer), 0);
        
        if (bytes_read > 0) {
            // Простейший HTTP-ответ
            const char* response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, World!";
            send(req.socket_fd, response, strlen(response), 0);
        }
        
        close(req.socket_fd);
    }
    
    return NULL;
}

Сравнение различных реализаций

Реализация Память Скорость enqueue Скорость dequeue Потокобезопасность Лучше для
Массив (циклический) Фиксированная O(1) O(1) Требует мьютексы Высокая производительность
Связный список Динамическая O(1) O(1) Требует мьютексы Переменный размер
Lock-free очередь Динамическая O(1)* O(1)* Встроенная Многопоточность
Приоритетная очередь Динамическая O(log n) O(log n) Требует мьютексы Задачи с приоритетами

* — в среднем случае, в worst case может быть больше из-за конкуренции

Оптимизации и продвинутые техники

В production-окружении стандартной реализации может быть недостаточно. Вот несколько техник для повышения производительности:

Lock-free очередь (базовая версия)

#include <stdatomic.h>

typedef struct LockFreeNode {
    atomic_int data;
    struct LockFreeNode* next;
} LockFreeNode;

typedef struct {
    LockFreeNode* head;
    LockFreeNode* tail;
} LockFreeQueue;

LockFreeQueue* createLockFreeQueue() {
    LockFreeQueue* q = (LockFreeQueue*)malloc(sizeof(LockFreeQueue));
    LockFreeNode* dummy = (LockFreeNode*)malloc(sizeof(LockFreeNode));
    dummy->next = NULL;
    q->head = q->tail = dummy;
    return q;
}

bool lfEnqueue(LockFreeQueue* q, int value) {
    LockFreeNode* newNode = (LockFreeNode*)malloc(sizeof(LockFreeNode));
    if (!newNode) return false;
    
    atomic_store(&newNode->data, value);
    newNode->next = NULL;
    
    LockFreeNode* prevTail = q->tail;
    while (!__sync_bool_compare_and_swap(&q->tail, prevTail, newNode)) {
        prevTail = q->tail;
    }
    prevTail->next = newNode;
    
    return true;
}

int lfDequeue(LockFreeQueue* q) {
    LockFreeNode* head = q->head;
    LockFreeNode* next = head->next;
    
    if (next == NULL) {
        return -1; // очередь пуста
    }
    
    int value = atomic_load(&next->data);
    
    if (__sync_bool_compare_and_swap(&q->head, head, next)) {
        free(head);
        return value;
    }
    
    return -1; // конкуренция, попробуйте ещё раз
}

Батчинг для повышения производительности

#define BATCH_SIZE 100

typedef struct {
    int data[BATCH_SIZE];
    int count;
} Batch;

typedef struct {
    Batch* batches;
    int capacity;
    int front;
    int rear;
    int size;
    pthread_mutex_t mutex;
} BatchQueue;

BatchQueue* createBatchQueue(int capacity) {
    BatchQueue* bq = (BatchQueue*)malloc(sizeof(BatchQueue));
    bq->batches = (Batch*)malloc(sizeof(Batch) * capacity);
    bq->capacity = capacity;
    bq->front = 0;
    bq->rear = 0;
    bq->size = 0;
    pthread_mutex_init(&bq->mutex, NULL);
    return bq;
}

bool enqueueBatch(BatchQueue* bq, int* values, int count) {
    if (count > BATCH_SIZE) return false;
    
    pthread_mutex_lock(&bq->mutex);
    
    if (bq->size >= bq->capacity) {
        pthread_mutex_unlock(&bq->mutex);
        return false;
    }
    
    Batch* batch = &bq->batches[bq->rear];
    memcpy(batch->data, values, sizeof(int) * count);
    batch->count = count;
    
    bq->rear = (bq->rear + 1) % bq->capacity;
    bq->size++;
    
    pthread_mutex_unlock(&bq->mutex);
    return true;
}

int dequeueBatch(BatchQueue* bq, int* buffer, int max_count) {
    pthread_mutex_lock(&bq->mutex);
    
    if (bq->size == 0) {
        pthread_mutex_unlock(&bq->mutex);
        return 0;
    }
    
    Batch* batch = &bq->batches[bq->front];
    int count = (batch->count < max_count) ? batch->count : max_count;
    
    memcpy(buffer, batch->data, sizeof(int) * count);
    
    bq->front = (bq->front + 1) % bq->capacity;
    bq->size--;
    
    pthread_mutex_unlock(&bq->mutex);
    return count;
}

Интеграция с популярными библиотеками

Часто нужно интегрировать очередь с существующими решениями. Вот примеры для популярных случаев:

Интеграция с Redis

#include <hiredis/hiredis.h>

typedef struct {
    redisContext* context;
    char* queue_name;
} RedisQueue;

RedisQueue* createRedisQueue(const char* host, int port, const char* queue_name) {
    RedisQueue* rq = (RedisQueue*)malloc(sizeof(RedisQueue));
    rq->context = redisConnect(host, port);
    
    if (rq->context == NULL || rq->context->err) {
        free(rq);
        return NULL;
    }
    
    rq->queue_name = strdup(queue_name);
    return rq;
}

bool redisEnqueue(RedisQueue* rq, const char* value) {
    redisReply* reply = redisCommand(rq->context, "LPUSH %s %s", 
                                   rq->queue_name, value);
    
    if (reply == NULL) return false;
    
    bool success = reply->type == REDIS_REPLY_INTEGER;
    freeReplyObject(reply);
    
    return success;
}

char* redisDequeue(RedisQueue* rq) {
    redisReply* reply = redisCommand(rq->context, "BRPOP %s 1", 
                                   rq->queue_name);
    
    if (reply == NULL || reply->type != REDIS_REPLY_ARRAY || 
        reply->elements != 2) {
        if (reply) freeReplyObject(reply);
        return NULL;
    }
    
    char* value = strdup(reply->element[1]->str);
    freeReplyObject(reply);
    
    return value;
}

Интеграция с ZeroMQ

#include <zmq.h>

typedef struct {
    void* context;
    void* socket;
} ZMQQueue;

ZMQQueue* createZMQQueue(const char* endpoint, int type) {
    ZMQQueue* zq = (ZMQQueue*)malloc(sizeof(ZMQQueue));
    
    zq->context = zmq_ctx_new();
    zq->socket = zmq_socket(zq->context, type);
    
    if (type == ZMQ_PUSH) {
        zmq_bind(zq->socket, endpoint);
    } else {
        zmq_connect(zq->socket, endpoint);
    }
    
    return zq;
}

bool zmqEnqueue(ZMQQueue* zq, const char* data, size_t size) {
    int rc = zmq_send(zq->socket, data, size, ZMQ_DONTWAIT);
    return rc != -1;
}

int zmqDequeue(ZMQQueue* zq, char* buffer, size_t buffer_size) {
    int rc = zmq_recv(zq->socket, buffer, buffer_size, 0);
    return rc;
}

void destroyZMQQueue(ZMQQueue* zq) {
    zmq_close(zq->socket);
    zmq_ctx_destroy(zq->context);
    free(zq);
}

Мониторинг и отладка

В продакшене важно следить за состоянием очередей. Вот полезные функции для мониторинга:

#include <sys/time.h>

typedef struct {
    long long total_enqueued;
    long long total_dequeued;
    long long current_size;
    long long max_size_reached;
    double avg_wait_time;
    struct timeval last_operation;
} QueueStats;

typedef struct {
    int data[1000];
    int front;
    int rear;
    int size;
    QueueStats stats;
    pthread_mutex_t mutex;
} MonitoredQueue;

MonitoredQueue* createMonitoredQueue() {
    MonitoredQueue* mq = (MonitoredQueue*)malloc(sizeof(MonitoredQueue));
    mq->front = 0;
    mq->rear = 0;
    mq->size = 0;
    
    // Инициализация статистики
    memset(&mq->stats, 0, sizeof(QueueStats));
    gettimeofday(&mq->stats.last_operation, NULL);
    
    pthread_mutex_init(&mq->mutex, NULL);
    return mq;
}

bool monitoredEnqueue(MonitoredQueue* mq, int value) {
    pthread_mutex_lock(&mq->mutex);
    
    if (mq->size >= 1000) {
        pthread_mutex_unlock(&mq->mutex);
        return false;
    }
    
    mq->data[mq->rear] = value;
    mq->rear = (mq->rear + 1) % 1000;
    mq->size++;
    
    // Обновление статистики
    mq->stats.total_enqueued++;
    mq->stats.current_size = mq->size;
    if (mq->size > mq->stats.max_size_reached) {
        mq->stats.max_size_reached = mq->size;
    }
    
    gettimeofday(&mq->stats.last_operation, NULL);
    
    pthread_mutex_unlock(&mq->mutex);
    return true;
}

int monitoredDequeue(MonitoredQueue* mq) {
    pthread_mutex_lock(&mq->mutex);
    
    if (mq->size == 0) {
        pthread_mutex_unlock(&mq->mutex);
        return -1;
    }
    
    int value = mq->data[mq->front];
    mq->front = (mq->front + 1) % 1000;
    mq->size--;
    
    // Обновление статистики
    mq->stats.total_dequeued++;
    mq->stats.current_size = mq->size;
    
    gettimeofday(&mq->stats.last_operation, NULL);
    
    pthread_mutex_unlock(&mq->mutex);
    return value;
}

void printQueueStats(MonitoredQueue* mq) {
    pthread_mutex_lock(&mq->mutex);
    
    printf("=== Queue Statistics ===\n");
    printf("Total enqueued: %lld\n", mq->stats.total_enqueued);
    printf("Total dequeued: %lld\n", mq->stats.total_dequeued);
    printf("Current size: %lld\n", mq->stats.current_size);
    printf("Max size reached: %lld\n", mq->stats.max_size_reached);
    printf("Throughput: %.2f ops/sec\n", 
           (double)(mq->stats.total_enqueued + mq->stats.total_dequeued) / 
           (time(NULL) - mq->stats.last_operation.tv_sec));
    
    pthread_mutex_unlock(&mq->mutex);
}

Полный пример: HTTP-сервер с очередью запросов

Собираем всё воедино в практический пример — простой HTTP-сервер с пулом воркеров:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define PORT 8080
#define MAX_WORKERS 10
#define QUEUE_SIZE 1000

typedef struct {
    int client_fd;
    struct sockaddr_in client_addr;
} Request;

typedef struct {
    Request requests[QUEUE_SIZE];
    int front;
    int rear;
    int count;
    pthread_mutex_t mutex;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
} RequestQueue;

RequestQueue* queue;
pthread_t workers[MAX_WORKERS];
int server_running = 1;

RequestQueue* createRequestQueue() {
    RequestQueue* q = malloc(sizeof(RequestQueue));
    q->front = 0;
    q->rear = 0;
    q->count = 0;
    pthread_mutex_init(&q->mutex, NULL);
    pthread_cond_init(&q->not_empty, NULL);
    pthread_cond_init(&q->not_full, NULL);
    return q;
}

void enqueueRequest(RequestQueue* q, Request req) {
    pthread_mutex_lock(&q->mutex);
    
    while (q->count == QUEUE_SIZE) {
        pthread_cond_wait(&q->not_full, &q->mutex);
    }
    
    q->requests[q->rear] = req;
    q->rear = (q->rear + 1) % QUEUE_SIZE;
    q->count++;
    
    pthread_cond_signal(&q->not_empty);
    pthread_mutex_unlock(&q->mutex);
}

Request dequeueRequest(RequestQueue* q) {
    pthread_mutex_lock(&q->mutex);
    
    while (q->count == 0 && server_running) {
        pthread_cond_wait(&q->not_empty, &q->mutex);
    }
    
    Request req = {0};
    if (q->count > 0) {
        req = q->requests[q->front];
        q->front = (q->front + 1) % QUEUE_SIZE;
        q->count--;
        pthread_cond_signal(&q->not_full);
    }
    
    pthread_mutex_unlock(&q->mutex);
    return req;
}

void* worker(void* arg) {
    int worker_id = *((int*)arg);
    free(arg);
    
    printf("Worker %d started\n", worker_id);
    
    while (server_running) {
        Request req = dequeueRequest(queue);
        
        if (req.client_fd == 0) continue;
        
        // Чтение запроса
        char buffer[1024] = {0};
        read(req.client_fd, buffer, 1024);
        
        // Простейший HTTP-ответ
        const char* response = 
            "HTTP/1.1 200 OK\r\n"
            "Content-Type: text/plain\r\n"
            "Content-Length: 25\r\n"
            "\r\n"
            "Hello from worker %d!\n";
        
        char formatted_response[512];
        snprintf(formatted_response, sizeof(formatted_response), 
                response, worker_id);
        
        send(req.client_fd, formatted_response, strlen(formatted_response), 0);
        close(req.client_fd);
        
        printf("Worker %d handled request from %s\n", 
               worker_id, inet_ntoa(req.client_addr.sin_addr));
    }
    
    return NULL;
}

int main() {
    int server_fd;
    struct sockaddr_in address;
    int addrlen = sizeof(address);
    
    // Создание сокета
    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    
    // Настройка адреса
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);
    
    // Привязка к порту
    if (bind(server_fd, (struct sockaddr*)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    
    // Прослушивание
    if (listen(server_fd, 3) < 0) {
        perror("listen failed");
        exit(EXIT_FAILURE);
    }
    
    // Создание очереди
    queue = createRequestQueue();
    
    // Запуск воркеров
    for (int i = 0; i < MAX_WORKERS; i++) {
        int* worker_id = malloc(sizeof(int));
        *worker_id = i;
        pthread_create(&workers[i], NULL, worker, worker_id);
    }
    
    printf("Server listening on port %d\n", PORT);
    
    // Основной цикл
    while (server_running) {
        struct sockaddr_in client_addr;
        int client_fd = accept(server_fd, (struct sockaddr*)&client_addr, 
                              (socklen_t*)&addrlen);
        
        if (client_fd < 0) {
            perror("accept failed");
            continue;
        }
        
        Request req = {client_fd, client_addr};
        enqueueRequest(queue, req);
    }
    
    // Завершение работы
    for (int i = 0; i < MAX_WORKERS; i++) {
        pthread_join(workers[i], NULL);
    }
    
    close(server_fd);
    return 0;
}

Компилируем и запускаем:

gcc -o http_server http_server.c -lpthread
./http_server

Тестируем с помощью curl:

curl http://localhost:8080/
ab -n 1000 -c 10 http://localhost:8080/

Альтернативные решения и библиотеки

Для продакшена стоит рассмотреть готовые решения:

  • ZeroMQ — высокопроизводительная библиотека для обмена сообщениями с различными паттернами очередей
  • Redis — in-memory база данных с отличной поддержкой очередей (LPUSH/RPOP, BRPOP)
  • RabbitMQ — полноценный брокер сообщений с C-клиентом
  • Apache Kafka — для больших объёмов данных с партиционированием
  • Beanstalkd — простой и быстрый сервер очередей

Для встраивания в приложения:

  • liblfds — lock-free структуры данных, включая очереди
  • Boost.Lockfree — C++ библиотека с биндингами для C
  • ConcurrentQueue — высокопроизводительная lock-free очередь

Если планируешь деплоить такие решения, рекомендую взять VPS с достаточным объёмом RAM для буферизации или выделенный сервер для высоконагруженных приложений.

Производительность и бенчмарки

Вот простой бенчмарк для сравнения разных реализаций:

#include <sys/time.h>

double get_time() {
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec + tv.tv_usec / 1000000.0;
}

void benchmark_queue() {
    const int OPERATIONS = 1000000;
    Queue* q = createQueue();
    
    double start = get_time();
    
    // Тест на добавление
    for (int i = 0; i < OPERATIONS; i++) {
        enqueue(q, i);
    }
    
    double enqueue_time = get_time() - start;
    
    // Тест на извлечение
    start = get_time();
    
    for (int i = 0; i < OPERATIONS; i++) {
        dequeue(q);
    }
    
    double dequeue_time = get_time() - start;
    
    printf("Enqueue: %.2f million ops/sec\n", 
           OPERATIONS / enqueue_time / 1000000.0);
    printf("Dequeue: %.2f million ops/sec\n", 
           OPERATIONS / dequeue_time / 1000000.0);
    
    destroyQueue(q);
}

void benchmark_multithreaded() {
    const int THREADS = 4;
    const int OPS_PER_THREAD = 250000;
    
    ThreadSafeQueue* q = createThreadSafeQueue();
    pthread_t producers[THREADS];
    pthread_t consumers[THREADS];
    
    double start = get_time();
    
    // Запуск продюсеров
    for (int i = 0; i < THREADS; i++) {
        int* thread_id = malloc(sizeof(int));
        *thread_id = i;
        pthread_create(&producers[i], NULL, producer_thread, thread_id);
    }
    
    // Запуск консьюмеров
    for (int i = 0; i < THREADS; i++) {
        int* thread_id = malloc(sizeof(int));
        *thread_id = i;
        pthread_create(&consumers[i], NULL, consumer_thread, thread_id);
    }
    
    // Ожидание завершения
    for (int i = 0; i < THREADS; i++) {
        pthread_join(producers[i], NULL);
        pthread_join(consumers[i], NULL);
    }
    
    double total_time = get_time() - start;
    
    printf("Multithreaded: %.2f million ops/sec\n", 
           (THREADS * OPS_PER_THREAD * 2) / total_time / 1000000.0);
    
    destroyThreadSafeQueue(q);
}

Отладка и решение проблем

Типичные проблемы и их решения:

  • Memory leaks — всегда освобождайте память, особенно в узлах связного списка
  • Deadlocks — используйте единый порядок захвата мьютексов
  • Race conditions — тщательно тестируйте многопоточный код
  • Buffer overflow — проверяйте границы массивов
  • Performance degradation — используйте профилировщики (gprof, perf)

Полезные инструменты для отладки:

# Проверка утечек памяти
valgrind --tool=memcheck --leak-check=full ./your_program

# Профилирование производительности
perf record -g ./your_program
perf report

# Анализ race conditions
helgrind ./your_program

Заключение

Очереди в C — это не просто учебная абстракция, а практический инструмент для решения реальных задач. Мы рассмотрели различные реализации, от простых массивов до сложных lock-free структур, и показали, как их применять в серверных приложениях.

Ключевые рекомендации:

  • Для небольших приложений — используйте простую реализацию на массиве
  • Для многопоточных систем — добавляйте синхронизацию или используйте lock-free алгоритмы
  • Для высоконагруженных систем — рассмотрите специализированные решения типа Redis или ZeroMQ
  • Для production — обязательно добавляйте мониторинг и логирование

Помните, что выбор реализации зависит от конкретных требований: размера данных, количества потоков, требований к производительности и надёжности. Начинайте с простого решения и усложняйте по мере необходимости.

Очереди открывают множество возможностей для автоматизации: от простого логирования до сложных систем обработки данных в реальном времени. Используйте их как строительные блоки для создания масштабируемых и надёжных приложений.


В этой статье собрана информация и материалы из различных интернет-источников. Мы признаем и ценим работу всех оригинальных авторов, издателей и веб-сайтов. Несмотря на то, что были приложены все усилия для надлежащего указания исходного материала, любая непреднамеренная оплошность или упущение не являются нарушением авторских прав. Все упомянутые товарные знаки, логотипы и изображения являются собственностью соответствующих владельцев. Если вы считаете, что какой-либо контент, использованный в этой статье, нарушает ваши авторские права, немедленно свяжитесь с нами для рассмотрения и принятия оперативных мер.

Данная статья предназначена исключительно для ознакомительных и образовательных целей и не ущемляет права правообладателей. Если какой-либо материал, защищенный авторским правом, был использован без должного упоминания или с нарушением законов об авторском праве, это непреднамеренно, и мы исправим это незамедлительно после уведомления. Обратите внимание, что переиздание, распространение или воспроизведение части или всего содержимого в любой форме запрещено без письменного разрешения автора и владельца веб-сайта. Для получения разрешений или дополнительных запросов, пожалуйста, свяжитесь с нами.

Leave a reply

Your email address will not be published. Required fields are marked