Microsoft Azure IoTHub client AMQP transport

Dependents:   sht15_remote_monitoring RobotArmDemo iothub_client_sample_amqp iothub_client_sample_amqp ... more

This library implements the AMQP transport for Microsoft Azure IoTHub client. The code is replicated from https://github.com/Azure/azure-iot-sdks

message_queue.c

Committer:
AzureIoTClient
Date:
2018-10-04
Revision:
57:56ac1346c70d
Parent:
56:8704100b3b54

File content as of revision 57:56ac1346c70d:

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

#include <stdlib.h>
#include <stdbool.h>
#include "azure_c_shared_utility/optimize_size.h"
#include "azure_c_shared_utility/crt_abstractions.h"
#include "azure_c_shared_utility/gballoc.h"
#include "azure_c_shared_utility/agenttime.h"
#include "azure_c_shared_utility/xlogging.h"
#include "azure_c_shared_utility/singlylinkedlist.h"

typedef struct MESSAGE_QUEUE_TAG MESSAGE_QUEUE;

#include "internal/message_queue.h"

#define RESULT_OK 0
#define INDEFINITE_TIME ((time_t)(-1))

static const char* SAVED_OPTION_MAX_RETRY_COUNT = "SAVED_OPTION_MAX_RETRY_COUNT";
static const char* SAVED_OPTION_MAX_ENQUEUE_TIME_SECS = "SAVED_OPTION_MAX_ENQUEUE_TIME_SECS";
static const char* SAVED_OPTION_MAX_PROCESSING_TIME_SECS = "SAVED_OPTION_MAX_PROCESSING_TIME_SECS";


struct MESSAGE_QUEUE_TAG
{
    size_t max_message_enqueued_time_secs;
    size_t max_message_processing_time_secs;
    size_t max_retry_count;

    PROCESS_MESSAGE_CALLBACK on_process_message_callback;
    void* on_process_message_context;

    SINGLYLINKEDLIST_HANDLE pending;
    SINGLYLINKEDLIST_HANDLE in_progress;
};

typedef struct MESSAGE_QUEUE_ITEM_TAG
{
    MQ_MESSAGE_HANDLE message;
    MESSAGE_PROCESSING_COMPLETED_CALLBACK on_message_processing_completed_callback;
    void* user_context;
    time_t enqueue_time;
    time_t processing_start_time;
    size_t number_of_attempts;
} MESSAGE_QUEUE_ITEM;



// ---------- Helper Functions ---------- //

static bool find_item_by_message_ptr(LIST_ITEM_HANDLE list_item, const void* match_context)
{
    MESSAGE_QUEUE_ITEM* current_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);
    MQ_MESSAGE_HANDLE* target_item = (MQ_MESSAGE_HANDLE*)match_context;

    return (current_item->message == target_item);
}

static void fire_message_callback(MESSAGE_QUEUE_ITEM* mq_item, MESSAGE_QUEUE_RESULT result, void* reason)
{
    if (mq_item->on_message_processing_completed_callback != NULL)
    {
        if (result == MESSAGE_QUEUE_RETRYABLE_ERROR)
        {
            result = MESSAGE_QUEUE_ERROR;
        }

        mq_item->on_message_processing_completed_callback(mq_item->message, result, reason, mq_item->user_context);
    }
}

static bool should_retry_sending(MESSAGE_QUEUE_HANDLE message_queue, MESSAGE_QUEUE_ITEM* mq_item, MESSAGE_QUEUE_RESULT result)
{
    return (result == MESSAGE_QUEUE_RETRYABLE_ERROR && mq_item->number_of_attempts <= message_queue->max_retry_count);
}

static int retry_sending_message(MESSAGE_QUEUE_HANDLE message_queue, LIST_ITEM_HANDLE list_item)
{
    int result;
    MESSAGE_QUEUE_ITEM* mq_item;

    mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);

    if (singlylinkedlist_remove(message_queue->in_progress, list_item))
    {
        LogError("Failed removing message from in-progress list");
        result = __FAILURE__;
    }
    else if (singlylinkedlist_add(message_queue->pending, (const void*)mq_item) == NULL)
    {
        LogError("Failed moving message back to pending list");
        result = __FAILURE__;
    }
    else
    {
        result = RESULT_OK;
    }

    return result;
}

static void dequeue_message_and_fire_callback(SINGLYLINKEDLIST_HANDLE list, LIST_ITEM_HANDLE list_item, MESSAGE_QUEUE_RESULT result, void* reason)
{
    MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);

    // Codes_SRS_MESSAGE_QUEUE_09_045: [If `message` is present in `message_queue->in_progress`, it shall be removed]
    if (singlylinkedlist_remove(list, list_item))
    {
        LogError("failed removing message from list (%p)", list);
    }

    // Codes_SRS_MESSAGE_QUEUE_09_049: [Otherwise `mq_item->on_message_processing_completed_callback` shall be invoked passing `mq_item->message`, `result`, `reason` and `mq_item->user_context`]
    fire_message_callback(mq_item, result, reason);

    // Codes_SRS_MESSAGE_QUEUE_09_050: [The `mq_item` related to `message` shall be freed]
    free(mq_item);
}

static void on_process_message_completed_callback(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, MESSAGE_QUEUE_RESULT result, USER_DEFINED_REASON reason)
{
    // Codes_SRS_MESSAGE_QUEUE_09_069: [If `message` or `message_queue` are NULL, on_message_processing_completed_callback shall return immediately]
    if (message == NULL || message_queue == NULL)
    {
        LogError("on_process_message_completed_callback invoked with NULL arguments (message=%p, message_queue=%p)", message, message_queue);
    }
    else
    {
        LIST_ITEM_HANDLE list_item;

        if ((list_item = singlylinkedlist_find(message_queue->in_progress, find_item_by_message_ptr, message)) == NULL)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_044: [If `message` is not present in `message_queue->in_progress`, it shall be ignored]
            LogError("on_process_message_completed_callback invoked for a message not in the in-progress list (%p)", message);
        }
        else
        {
            MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);

            // Codes_SRS_MESSAGE_QUEUE_09_047: [If `result` is MESSAGE_QUEUE_RETRYABLE_ERROR and `mq_item->number_of_attempts` is less than or equal `message_queue->max_retry_count`, the `message` shall be moved to `message_queue->pending` to be re-sent]
            // Codes_SRS_MESSAGE_QUEUE_09_048: [If `result` is MESSAGE_QUEUE_RETRYABLE_ERROR and `mq_item->number_of_attempts` is greater than `message_queue->max_retry_count`, result shall be changed to MESSAGE_QUEUE_ERROR]
            if (!should_retry_sending(message_queue, mq_item, result) || retry_sending_message(message_queue, list_item) != RESULT_OK)
            {
                dequeue_message_and_fire_callback(message_queue->in_progress, list_item, result, reason);
            }
        }
    }
}

static void process_timeouts(MESSAGE_QUEUE_HANDLE message_queue)
{
    time_t current_time;

    if ((current_time = get_time(NULL)) == INDEFINITE_TIME)
    {
        LogError("failed processing timeouts (get_time failed)");
    }
    else
    {
        // Codes_SRS_MESSAGE_QUEUE_09_035: [If `message_queue->max_message_enqueued_time_secs` is greater than zero, `message_queue->in_progress` and `message_queue->pending` items shall be checked for timeout]
        if (message_queue->max_message_enqueued_time_secs > 0)
        {
            LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(message_queue->pending);

            while (list_item != NULL)
            {
                LIST_ITEM_HANDLE current_list_item = list_item;
                MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(current_list_item);

                list_item = singlylinkedlist_get_next_item(list_item);

                if (mq_item == NULL)
                {
                    LogError("failed processing timeouts (unexpected NULL pointer to MESSAGE_QUEUE_ITEM)");
                }
                else if (get_difftime(current_time, mq_item->enqueue_time) >= message_queue->max_message_enqueued_time_secs)
                {
                    // Codes_SRS_MESSAGE_QUEUE_09_036: [If any items are in `message_queue` lists for `message_queue->max_message_enqueued_time_secs` or more, they shall be removed and `message_queue->on_message_processing_completed_callback` invoked with MESSAGE_QUEUE_TIMEOUT]
                    dequeue_message_and_fire_callback(message_queue->pending, current_list_item, MESSAGE_QUEUE_TIMEOUT, NULL);
                }
                else
                {
                    // The pending list order is already based on enqueue time, so if one message is not expired, later ones won't be either.
                    break;
                }
            }

            list_item = singlylinkedlist_get_head_item(message_queue->in_progress);

            while (list_item != NULL)
            {
                LIST_ITEM_HANDLE current_list_item = list_item;
                MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(current_list_item);

                list_item = singlylinkedlist_get_next_item(list_item);

                if (mq_item == NULL)
                {
                    LogError("failed processing timeouts (unexpected NULL pointer to MESSAGE_QUEUE_ITEM)");
                }
                else if (get_difftime(current_time, mq_item->enqueue_time) >= message_queue->max_message_enqueued_time_secs)
                {
                    // Codes_SRS_MESSAGE_QUEUE_09_038: [If any items are in `message_queue->in_progress` for `message_queue->max_message_processing_time_secs` or more, they shall be removed and `message_queue->on_message_processing_completed_callback` invoked with MESSAGE_QUEUE_TIMEOUT]
                    dequeue_message_and_fire_callback(message_queue->in_progress, current_list_item, MESSAGE_QUEUE_TIMEOUT, NULL);
                }
            }
        }

        // Codes_SRS_MESSAGE_QUEUE_09_037: [If `message_queue->max_message_processing_time_secs` is greater than zero, `message_queue->in_progress` items shall be checked for timeout]
        if (message_queue->max_message_processing_time_secs > 0)
        {
            LIST_ITEM_HANDLE list_item = singlylinkedlist_get_head_item(message_queue->in_progress);

            while (list_item != NULL)
            {
                LIST_ITEM_HANDLE current_list_item = list_item;
                MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(current_list_item);

                list_item = singlylinkedlist_get_next_item(list_item);

                if (mq_item == NULL)
                {
                    LogError("failed processing timeouts (unexpected NULL pointer to MESSAGE_QUEUE_ITEM)");
                }
                else if (get_difftime(current_time, mq_item->processing_start_time) >= message_queue->max_message_processing_time_secs)
                {
                    dequeue_message_and_fire_callback(message_queue->in_progress, current_list_item, MESSAGE_QUEUE_TIMEOUT, NULL);
                }
                else
                {
                    // The in-progress list order is already based on start-processing time, so if one message is not expired, later ones won't be either.
                    break;
                }
            }
        }
    }
}

static void process_pending_messages(MESSAGE_QUEUE_HANDLE message_queue)
{
    LIST_ITEM_HANDLE list_item;

    while ((list_item = singlylinkedlist_get_head_item(message_queue->pending)) != NULL)
    {
        MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);

        if (mq_item == NULL)
        {
            LogError("internal error, failed to retrieve list node value");
            break;
        }
        else if (singlylinkedlist_remove(message_queue->pending, list_item) != 0)
        {
            LogError("failed moving message out of pending list (%p)", mq_item->message);

            // Codes_SRS_MESSAGE_QUEUE_09_042: [If any failures occur, `mq_item->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_ERROR and `mq_item` freed]
            if (mq_item->on_message_processing_completed_callback != NULL)
            {
                mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context);
            }

            // Not freeing since this would cause a memory A/V on the next call.

            break; // Trying to avoid an infinite loop
        }
        // Codes_SRS_MESSAGE_QUEUE_09_040: [`mq_item->processing_start_time` shall be set using get_time()]
        else if ((mq_item->processing_start_time = get_time(NULL)) == INDEFINITE_TIME)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_041: [If get_time() fails, `mq_item` shall be removed from `message_queue->in_progress`]
            LogError("failed setting message processing_start_time (%p)", mq_item->message);

            // Codes_SRS_MESSAGE_QUEUE_09_042: [If any failures occur, `mq_item->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_ERROR and `mq_item` freed]
            if (mq_item->on_message_processing_completed_callback != NULL)
            {
                mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context);
            }

            free(mq_item);
        }
        // Codes_SRS_MESSAGE_QUEUE_09_039: [Each `mq_item` in `message_queue->pending` shall be moved to `message_queue->in_progress`]
        else if (singlylinkedlist_add(message_queue->in_progress, (const void*)mq_item) == NULL)
        {
            LogError("failed moving message to in-progress list (%p)", mq_item->message);

            // Codes_SRS_MESSAGE_QUEUE_09_042: [If any failures occur, `mq_item->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_ERROR and `mq_item` freed]
            if (mq_item->on_message_processing_completed_callback != NULL)
            {
                mq_item->on_message_processing_completed_callback(mq_item->message, MESSAGE_QUEUE_ERROR, NULL, mq_item->user_context);
            }

            free(mq_item);
        }
        else
        {
            mq_item->number_of_attempts++;

            // Codes_SRS_MESSAGE_QUEUE_09_043: [If no failures occur, `message_queue->on_process_message_callback` shall be invoked passing `mq_item->message` and `on_process_message_completed_callback`]
            message_queue->on_process_message_callback(message_queue, mq_item->message, on_process_message_completed_callback, mq_item->user_context);
        }
    }
}

static void* cloneOption(const char* name, const void* value)
{
    void* result;

    if (name == NULL || value == NULL)
    {
        LogError("invalid argument (name=%p, value=%p)", name, value);
        result = NULL;
    }
    else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0 ||
        strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0 ||
        strcmp(SAVED_OPTION_MAX_RETRY_COUNT, name) == 0)
    {
        if ((result = malloc(sizeof(size_t))) == NULL)
        {
            LogError("failed cloning option %s (malloc failed)", name);
        }
        else
        {
            memcpy(result, value, sizeof(size_t));
        }
    }
    else
    {
        LogError("option %s is invalid", name);
        result = NULL;
    }

    return result;
}

static void destroyOption(const char* name, const void* value)
{
    if (name == NULL || value == NULL)
    {
        LogError("invalid argument (name=%p, value=%p)", name, value);
    }
    else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0 ||
        strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0 ||
        strcmp(SAVED_OPTION_MAX_RETRY_COUNT, name) == 0)
    {
        free((void*)value);
    }
    else
    {
        LogError("option %s is invalid", name);
    }
}



// ---------- Public APIs ---------- //

void message_queue_remove_all(MESSAGE_QUEUE_HANDLE message_queue)
{
    // Codes_SRS_MESSAGE_QUEUE_09_026: [If `message_queue` is NULL, message_queue_retrieve_options shall return]
    if (message_queue != NULL)
    {
        LIST_ITEM_HANDLE list_item;

        // Codes_SRS_MESSAGE_QUEUE_09_027: [Each `mq_item` in `message_queue->pending` and `message_queue->in_progress` lists shall be removed]
        while ((list_item = singlylinkedlist_get_head_item(message_queue->in_progress)) != NULL)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_028: [`message_queue->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_CANCELLED for each `mq_item` removed]
            // Codes_SRS_MESSAGE_QUEUE_09_029: [Each `mq_item` shall be freed]
            dequeue_message_and_fire_callback(message_queue->in_progress, list_item, MESSAGE_QUEUE_CANCELLED, NULL);
        }

        while ((list_item = singlylinkedlist_get_head_item(message_queue->pending)) != NULL)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_028: [`message_queue->on_message_processing_completed_callback` shall be invoked with MESSAGE_QUEUE_CANCELLED for each `mq_item` removed]
            // Codes_SRS_MESSAGE_QUEUE_09_029: [Each `mq_item` shall be freed]
            dequeue_message_and_fire_callback(message_queue->pending, list_item, MESSAGE_QUEUE_CANCELLED, NULL);
        }
    }
}

static int move_messages_between_lists(SINGLYLINKEDLIST_HANDLE from_list, SINGLYLINKEDLIST_HANDLE to_list)
{
    int result;
    LIST_ITEM_HANDLE list_item;

    result = RESULT_OK;

    while ((list_item = singlylinkedlist_get_head_item(from_list)) != NULL)
    {
        if (singlylinkedlist_remove(from_list, list_item) != 0)
        {
            LogError("failed removing message from list");
            result = __FAILURE__;
        }
        else
        {
            MESSAGE_QUEUE_ITEM* mq_item = (MESSAGE_QUEUE_ITEM*)singlylinkedlist_item_get_value(list_item);

            if (singlylinkedlist_add(to_list, (const void*)mq_item) != 0)
            {
                LogError("failed moving message to list");

                fire_message_callback(mq_item, MESSAGE_QUEUE_CANCELLED, NULL);

                free(mq_item);

                result = __FAILURE__;

                break;
            }
            else
            {
                mq_item->number_of_attempts = 0;
                mq_item->processing_start_time = INDEFINITE_TIME;
            }
        }
    }

    return result;
}

int message_queue_move_all_back_to_pending(MESSAGE_QUEUE_HANDLE message_queue)
{
    int result;

    if (message_queue == NULL)
    {
        LogError("invalid argument (message_queue is NULL)");
        result = __FAILURE__;
    }
    else
    {
        SINGLYLINKEDLIST_HANDLE temp_list;

        if ((temp_list = singlylinkedlist_create()) == NULL)
        {
            LogError("failed creating temporary list");
            result = __FAILURE__;
        }
        else
        {
            if (move_messages_between_lists(message_queue->in_progress, temp_list) != 0)
            {
                LogError("failed moving in-progress message to temporary list");
                result = __FAILURE__;
            }
            else if (move_messages_between_lists(message_queue->pending, temp_list) != 0)
            {
                LogError("failed moving pending message to temporary list");
                result = __FAILURE__;
            }
            else if (move_messages_between_lists(temp_list, message_queue->pending) != 0)
            {
                LogError("failed moving pending message to temporary list");
                result = __FAILURE__;
            }
            else
            {
                result = RESULT_OK;
            }

            if (result != RESULT_OK)
            {
                LIST_ITEM_HANDLE list_item;

                while ((list_item = singlylinkedlist_get_head_item(temp_list)) != NULL)
                {
                    dequeue_message_and_fire_callback(temp_list, list_item, MESSAGE_QUEUE_CANCELLED, NULL);
                }
            }

            singlylinkedlist_destroy(temp_list);
        }
    }

    return result;
}

void message_queue_destroy(MESSAGE_QUEUE_HANDLE message_queue)
{
    // Codes_SRS_MESSAGE_QUEUE_09_013: [If `message_queue` is NULL, message_queue_destroy shall return immediately]
    if (message_queue != NULL)
    {
        // Codes_SRS_MESSAGE_QUEUE_09_014: [message_queue_destroy shall invoke message_queue_remove_all]
        message_queue_remove_all(message_queue);

        // Codes_SRS_MESSAGE_QUEUE_09_015: [message_queue_destroy shall free all memory allocated and pointed by `message_queue`]
        if (message_queue->pending != NULL)
        {
            singlylinkedlist_destroy(message_queue->pending);
        }

        if (message_queue->in_progress != NULL)
        {
            singlylinkedlist_destroy(message_queue->in_progress);
        }

        free(message_queue);
    }
}

MESSAGE_QUEUE_HANDLE message_queue_create(MESSAGE_QUEUE_CONFIG* config)
{
    MESSAGE_QUEUE* result;

    // Codes_SRS_MESSAGE_QUEUE_09_001: [If `config` is NULL, message_queue_create shall fail and return NULL]
    if (config == NULL)
    {
        LogError("invalid configuration (NULL)");
        result = NULL;
    }
    // Codes_SRS_MESSAGE_QUEUE_09_002: [If `config->on_process_message_callback` is NULL, message_queue_create shall fail and return NULL]
    else if (config->on_process_message_callback == NULL)
    {
        LogError("invalid configuration (on_process_message_callback is NULL)");
        result = NULL;
    }
    // Codes_SRS_MESSAGE_QUEUE_09_004: [Memory shall be allocated for the MESSAGE_QUEUE data structure (aka `message_queue`)]
    else if ((result = (MESSAGE_QUEUE*)malloc(sizeof(MESSAGE_QUEUE))) == NULL)
    {
        // Codes_SRS_MESSAGE_QUEUE_09_005: [If `instance` cannot be allocated, message_queue_create shall fail and return NULL]
        LogError("failed allocating MESSAGE_QUEUE");
        result = NULL;
    }
    else
    {
        memset(result, 0, sizeof(MESSAGE_QUEUE));

        // Codes_SRS_MESSAGE_QUEUE_09_006: [`message_queue->pending` shall be set using singlylinkedlist_create()]
        if ((result->pending = singlylinkedlist_create()) == NULL)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_007: [If singlylinkedlist_create fails, message_queue_create shall fail and return NULL]
            LogError("failed allocating MESSAGE_QUEUE pending list");
            // Codes_SRS_MESSAGE_QUEUE_09_011: [If any failures occur, message_queue_create shall release all memory it has allocated]
            message_queue_destroy(result);
            result = NULL;
        }
        // Codes_SRS_MESSAGE_QUEUE_09_008: [`message_queue->in_progress` shall be set using singlylinkedlist_create()]
        else if ((result->in_progress = singlylinkedlist_create()) == NULL)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_009: [If singlylinkedlist_create fails, message_queue_create shall fail and return NULL]
            LogError("failed allocating MESSAGE_QUEUE in-progress list");
            // Codes_SRS_MESSAGE_QUEUE_09_011: [If any failures occur, message_queue_create shall release all memory it has allocated]
            message_queue_destroy(result);
            result = NULL;
        }
        else
        {
            // Codes_SRS_MESSAGE_QUEUE_09_010: [All arguments in `config` shall be saved into `message_queue`]
            // Codes_SRS_MESSAGE_QUEUE_09_012: [If no failures occur, message_queue_create shall return the `message_queue` pointer]

            result->max_message_enqueued_time_secs = config->max_message_enqueued_time_secs;
            result->max_message_processing_time_secs = config->max_message_processing_time_secs;
            result->max_retry_count = config->max_retry_count;
            result->on_process_message_callback = config->on_process_message_callback;
        }
    }

    return result;
}

int message_queue_add(MESSAGE_QUEUE_HANDLE message_queue, MQ_MESSAGE_HANDLE message, MESSAGE_PROCESSING_COMPLETED_CALLBACK on_message_processing_completed_callback, void* user_context)
{
    int result;

    // Codes_SRS_MESSAGE_QUEUE_09_016: [If `message_queue` or `message` are NULL, message_queue_add shall fail and return non-zero]
    if (message_queue == NULL || message == NULL)
    {
        LogError("invalid argument (message_queue=%p, message=%p)", message_queue, message);
        result = __FAILURE__;
    }
    else
    {
        MESSAGE_QUEUE_ITEM* mq_item;

        // Codes_SRS_MESSAGE_QUEUE_09_017: [message_queue_add shall allocate a structure (aka `mq_item`) to save the `message`]
        if ((mq_item = (MESSAGE_QUEUE_ITEM*)malloc(sizeof(MESSAGE_QUEUE_ITEM))) == NULL)
        {
            // Codes_SRS_MESSAGE_QUEUE_09_018: [If `mq_item` cannot be allocated, message_queue_add shall fail and return non-zero]
            LogError("failed creating container for message");
            result = __FAILURE__;
        }
        else
        {
            memset(mq_item, 0, sizeof(MESSAGE_QUEUE_ITEM));

            // Codes_SRS_MESSAGE_QUEUE_09_019: [`mq_item->enqueue_time` shall be set using get_time()]
            if ((mq_item->enqueue_time = get_time(NULL)) == INDEFINITE_TIME)
            {
                // Codes_SRS_MESSAGE_QUEUE_09_020: [If get_time fails, message_queue_add shall fail and return non-zero]
                LogError("failed setting message enqueue time");
                // Codes_SRS_MESSAGE_QUEUE_09_024: [If any failures occur, message_queue_add shall release all memory it has allocated]
                free(mq_item);
                result = __FAILURE__;
            }
            // Codes_SRS_MESSAGE_QUEUE_09_021: [`mq_item` shall be added to `message_queue->pending` list]
            else if (singlylinkedlist_add(message_queue->pending, (const void*)mq_item) == NULL)
            {
                // Codes_SRS_MESSAGE_QUEUE_09_022: [`mq_item` fails to be added to `message_queue->pending`, message_queue_add shall fail and return non-zero]
                LogError("failed enqueing message");
                // Codes_SRS_MESSAGE_QUEUE_09_024: [If any failures occur, message_queue_add shall release all memory it has allocated]
                free(mq_item);
                result = __FAILURE__;
            }
            else
            {
                // Codes_SRS_MESSAGE_QUEUE_09_023: [`message` shall be saved into `mq_item->message`]
                mq_item->message = message;
                mq_item->on_message_processing_completed_callback = on_message_processing_completed_callback;
                mq_item->user_context = user_context;
                mq_item->processing_start_time = INDEFINITE_TIME;
                // Codes_SRS_MESSAGE_QUEUE_09_025: [If no failures occur, message_queue_add shall return 0]
                result = RESULT_OK;
            }
        }
    }

    return result;
}

int message_queue_is_empty(MESSAGE_QUEUE_HANDLE message_queue, bool* is_empty)
{
    int result;

    // Codes_SRS_MESSAGE_QUEUE_09_030: [If `message_queue` or `is_empty` are NULL, message_queue_is_empty shall fail and return non-zero]
    if (message_queue == NULL || is_empty == NULL)
    {
        LogError("invalid argument (message_queue=%p, is_empty=%p)", message_queue, is_empty);
        result = __FAILURE__;
    }
    else
    {
        // Codes_SRS_MESSAGE_QUEUE_09_031: [If `message_queue->pending` and `message_queue->in_progress` are empty, `is_empty` shall be set to true]
        // Codes_SRS_MESSAGE_QUEUE_09_032: [Otherwise `is_empty` shall be set to false]
        *is_empty = (singlylinkedlist_get_head_item(message_queue->pending) == NULL && singlylinkedlist_get_head_item(message_queue->in_progress) == NULL);
        // Codes_SRS_MESSAGE_QUEUE_09_033: [If no failures occur, message_queue_is_empty shall return 0]
        result = RESULT_OK;
    }

    return result;
}

void message_queue_do_work(MESSAGE_QUEUE_HANDLE message_queue)
{
    // Codes_SRS_MESSAGE_QUEUE_09_034: [If `message_queue` is NULL, message_queue_do_work shall return immediately]
    if (message_queue != NULL)
    {
        process_timeouts(message_queue);
        process_pending_messages(message_queue);
    }
}

int message_queue_set_max_message_enqueued_time_secs(MESSAGE_QUEUE_HANDLE message_queue, size_t seconds)
{
    int result;

    // Codes_SRS_MESSAGE_QUEUE_09_051: [If `message_queue` is NULL, message_queue_set_max_message_enqueued_time_secs shall fail and return non-zero]
    if (message_queue == NULL)
    {
        LogError("invalid argument (message_queue is NULL)");
        result = __FAILURE__;
    }
    else
    {
        // Codes_SRS_MESSAGE_QUEUE_09_053: [`seconds` shall be saved into `message_queue->max_message_enqueued_time_secs`]
        message_queue->max_message_enqueued_time_secs = seconds;
        // Codes_SRS_MESSAGE_QUEUE_09_054: [If no failures occur, message_queue_set_max_message_enqueued_time_secs shall return 0]
        result = RESULT_OK;
    }

    return result;
}

int message_queue_set_max_message_processing_time_secs(MESSAGE_QUEUE_HANDLE message_queue, size_t seconds)
{
    int result;

    // Codes_SRS_MESSAGE_QUEUE_09_055: [If `message_queue` is NULL, message_queue_set_max_message_processing_time_secs shall fail and return non-zero]
    if (message_queue == NULL)
    {
        LogError("invalid argument (message_queue is NULL)");
        result = __FAILURE__;
    }
    else
    {
        // Codes_SRS_MESSAGE_QUEUE_09_057: [`seconds` shall be saved into `message_queue->max_message_processing_time_secs`]
        message_queue->max_message_processing_time_secs = seconds;
        // Codes_SRS_MESSAGE_QUEUE_09_058: [If no failures occur, message_queue_set_max_message_processing_time_secs shall return 0]
        result = RESULT_OK;
    }

    return result;
}

int message_queue_set_max_retry_count(MESSAGE_QUEUE_HANDLE message_queue, size_t max_retry_count)
{
    int result;

    // Codes_SRS_MESSAGE_QUEUE_09_059: [If `message_queue` is NULL, message_queue_set_max_retry_count shall fail and return non-zero]
    if (message_queue == NULL)
    {
        LogError("invalid argument (message_queue is NULL)");
        result = __FAILURE__;
    }
    else
    {
        // Codes_SRS_MESSAGE_QUEUE_09_061: [If no failures occur, message_queue_set_max_retry_count shall return 0]
        message_queue->max_retry_count = max_retry_count;
        // Codes_SRS_MESSAGE_QUEUE_09_060: [`max_retry_count` shall be saved into `message_queue->max_retry_count`]
        result = RESULT_OK;
    }

    return result;
}

static int setOption(void* handle, const char* name, const void* value)
{
    int result;

    if (handle == NULL || name == NULL || value == NULL)
    {
        LogError("invalid argument (handle=%p, name=%p, value=%p)", handle, name, value);
        result = __FAILURE__;
    }
    else if (strcmp(SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, name) == 0)
    {
        if (message_queue_set_max_message_enqueued_time_secs((MESSAGE_QUEUE_HANDLE)handle, *(size_t*)value) != RESULT_OK)
        {
            LogError("failed setting option %s", name);
            result = __FAILURE__;
        }
        else
        {
            result = RESULT_OK;
        }
    }
    else if (strcmp(SAVED_OPTION_MAX_PROCESSING_TIME_SECS, name) == 0)
    {
        if (message_queue_set_max_message_processing_time_secs((MESSAGE_QUEUE_HANDLE)handle, *(size_t*)value) != RESULT_OK)
        {
            LogError("failed setting option %s", name);
            result = __FAILURE__;
        }
        else
        {
            result = RESULT_OK;
        }
    }
    else if (strcmp(SAVED_OPTION_MAX_RETRY_COUNT, name) == 0)
    {
        if (message_queue_set_max_retry_count((MESSAGE_QUEUE_HANDLE)handle, *(size_t*)value) != RESULT_OK)
        {
            LogError("failed setting option %s", name);
            result = __FAILURE__;
        }
        else
        {
            result = RESULT_OK;
        }
    }
    else
    {
        LogError("option %s is invalid", name);
        result = __FAILURE__;
    }

    return result;
}

OPTIONHANDLER_HANDLE message_queue_retrieve_options(MESSAGE_QUEUE_HANDLE message_queue)
{
    OPTIONHANDLER_HANDLE result;

    // Codes_SRS_MESSAGE_QUEUE_09_062: [If `message_queue` is NULL, message_queue_retrieve_options shall fail and return NULL]
    if (message_queue == NULL)
    {
        LogError("invalid argument (message_queue is NULL)");
        result = NULL;
    }
    // Codes_SRS_MESSAGE_QUEUE_09_063: [An OPTIONHANDLER_HANDLE instance shall be created using OptionHandler_Create]
    else if ((result = OptionHandler_Create(cloneOption, destroyOption, setOption)) == NULL)
    {
        // Codes_SRS_MESSAGE_QUEUE_09_064: [If an OPTIONHANDLER_HANDLE instance fails to be created, message_queue_retrieve_options shall fail and return NULL]
        LogError("failed creating OPTIONHANDLER_HANDLE");
    }
    // Codes_SRS_MESSAGE_QUEUE_09_065: [Each option of `instance` shall be added to the OPTIONHANDLER_HANDLE instance using OptionHandler_AddOption]
    else if (OptionHandler_AddOption(result, SAVED_OPTION_MAX_ENQUEUE_TIME_SECS, &message_queue->max_message_enqueued_time_secs) != OPTIONHANDLER_OK)
    {
        LogError("failed retrieving options (failed adding %s)", SAVED_OPTION_MAX_ENQUEUE_TIME_SECS);
        // Codes_SRS_MESSAGE_QUEUE_09_067: [If message_queue_retrieve_options fails, any allocated memory shall be freed]
        OptionHandler_Destroy(result);
        // Codes_SRS_MESSAGE_QUEUE_09_066: [If OptionHandler_AddOption fails, message_queue_retrieve_options shall fail and return NULL]
        result = NULL;
    }
    else if (OptionHandler_AddOption(result, SAVED_OPTION_MAX_PROCESSING_TIME_SECS, &message_queue->max_message_processing_time_secs) != OPTIONHANDLER_OK)
    {
        LogError("failed retrieving options (failed adding %s)", SAVED_OPTION_MAX_PROCESSING_TIME_SECS);
        // Codes_SRS_MESSAGE_QUEUE_09_067: [If message_queue_retrieve_options fails, any allocated memory shall be freed]
        OptionHandler_Destroy(result);
        // Codes_SRS_MESSAGE_QUEUE_09_066: [If OptionHandler_AddOption fails, message_queue_retrieve_options shall fail and return NULL]
        result = NULL;
    }
    else if (OptionHandler_AddOption(result, SAVED_OPTION_MAX_RETRY_COUNT, &message_queue->max_retry_count) != OPTIONHANDLER_OK)
    {
        LogError("failed retrieving options (failed adding %s)", SAVED_OPTION_MAX_PROCESSING_TIME_SECS);
        // Codes_SRS_MESSAGE_QUEUE_09_067: [If message_queue_retrieve_options fails, any allocated memory shall be freed]
        OptionHandler_Destroy(result);
        // Codes_SRS_MESSAGE_QUEUE_09_066: [If OptionHandler_AddOption fails, message_queue_retrieve_options shall fail and return NULL]
        result = NULL;
    }

    // Codes_SRS_MESSAGE_QUEUE_09_068: [If no failures occur, message_queue_retrieve_options shall return the OPTIONHANDLER_HANDLE instance]
    return result;
}