MQTT

Overview

MQTT (Message Queuing Telemetry Transport) is an application layer protocol which works on top of the TCP/IP stack. It is a lightweight publish/subscribe messaging transport for machine-to-machine communication. For more information about the protocol itself, see http://mqtt.org/.

Zephyr provides an MQTT client library built on top of BSD sockets API. The library is configurable at a per-client basis, with support for MQTT versions 3.1.0 and 3.1.1. The Zephyr MQTT implementation can be used with either plain sockets communicating over TCP, or with secure sockets communicating over TLS. See BSD Sockets for more information about Zephyr sockets.

MQTT clients require an MQTT server to connect to. Such a server, called an MQTT Broker, is responsible for managing client subscriptions and distributing messages published by clients. There are many implementations of MQTT brokers, one of them being Eclipse Mosquitto. See https://mosquitto.org/ for more information about the Eclipse Mosquitto project.

Sample usage

To create an MQTT client, a client context structure and buffers need to be defined:

/* Buffers for MQTT client. */
static uint8_t rx_buffer[256];
static uint8_t tx_buffer[256];

/* MQTT client context */
static struct mqtt_client client_ctx;

Multiple MQTT client instances can be created in the application and managed independently. Additionally, a structure for MQTT Broker address information is needed. This structure must be accessible throughout the lifespan of the MQTT client and can be shared among MQTT clients:

/* MQTT Broker address information. */
static struct sockaddr_storage broker;

An MQTT client library will notify MQTT events to the application through a callback function created to handle respective events:

void mqtt_evt_handler(struct mqtt_client *client,
                      const struct mqtt_evt *evt)
{
   switch (evt->type) {
      /* Handle events here. */
   }
}

For a list of possible events, see API Reference.

The client context structure needs to be initialized and set up before it can be used. An example configuration for TCP transport is shown below:

mqtt_client_init(&client_ctx);

/* MQTT client configuration */
client_ctx.broker = &broker;
client_ctx.evt_cb = mqtt_evt_handler;
client_ctx.client_id.utf8 = (uint8_t *)"zephyr_mqtt_client";
client_ctx.client_id.size = sizeof("zephyr_mqtt_client") - 1;
client_ctx.password = NULL;
client_ctx.user_name = NULL;
client_ctx.protocol_version = MQTT_VERSION_3_1_1;
client_ctx.transport.type = MQTT_TRANSPORT_NON_SECURE;

/* MQTT buffers configuration */
client_ctx.rx_buf = rx_buffer;
client_ctx.rx_buf_size = sizeof(rx_buffer);
client_ctx.tx_buf = tx_buffer;
client_ctx.tx_buf_size = sizeof(tx_buffer);

After the configuration is set up, the MQTT client can connect to the MQTT broker. Call the mqtt_connect function, which will create the appropriate socket, establish a TCP/TLS connection, and send an MQTT CONNECT message. When notified, the application should call the mqtt_input function to process the response received. Note, that mqtt_input is a non-blocking function, therefore the application should use socket poll to wait for the response. If the connection was successful, MQTT_EVT_CONNACK will be notified to the application through the callback function.

rc = mqtt_connect(&client_ctx);
if (rc != 0) {
   return rc;
}

fds[0].fd = client_ctx.transport.tcp.sock;
fds[0].events = ZSOCK_POLLIN;
poll(fds, 1, K_MSEC(5000));

mqtt_input(&client_ctx);

if (!connected) {
   mqtt_abort(&client_ctx);
}

In the above code snippet, the MQTT callback function should set the connected flag upon a successful connection. If the connection fails at the MQTT level or a timeout occurs, the connection will be aborted, and the underlying socket closed.

After the connection is established, an application needs to call mqtt_input and mqtt_live functions periodically to process incoming data and upkeep the connection. If an MQTT message is received, an MQTT callback function will be called and an appropriate event notified.

The connection can be closed by calling the mqtt_disconnect function.

Zephyr provides sample code utilizing the MQTT client API. See MQTT Publisher for more information.

Using MQTT with TLS

The Zephyr MQTT library can be used with TLS transport for secure communication by selecting a secure transport type (MQTT_TRANSPORT_SECURE) and some additional configuration information:

client_ctx.transport.type = MQTT_TRANSPORT_SECURE;

struct mqtt_sec_config *tls_config = &client_ctx.transport.tls.config;

tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = m_sec_tags;
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
tls_config->hostname = MQTT_BROKER_HOSTNAME;

In this sample code, the m_sec_tags array holds a list of tags, referencing TLS credentials that the MQTT library should use for authentication. We do not specify cipher_list, to allow the use of all cipher suites available in the system. We set hostname field to broker hostname, which is required for server authentication. Finally, we enforce peer certificate verification by setting the peer_verify field.

Note, that TLS credentials referenced by the m_sec_tags array must be registered in the system first. For more information on how to do that, refer to secure sockets documentation.

An example of how to use TLS with MQTT is also present in MQTT Publisher.

API Reference

group mqtt_socket

MQTT Client Implementation.

MQTT Client’s Application interface is defined in this header.

Note

The implementation assumes TCP module is enabled.

Note

By default the implementation uses MQTT version 3.1.1.

Defines

MQTT_UTF8_LITERAL(literal)

Initialize UTF-8 encoded string from C literal string.

Use it as follows:

struct mqtt_utf8 password = MQTT_UTF8_LITERAL(“my_pass”);

Parameters
  • literal[in] Literal string from which to generate mqtt_utf8 object.

Typedefs

typedef void (*mqtt_evt_cb_t)(struct mqtt_client *client, const struct mqtt_evt *evt)

Asynchronous event notification callback registered by the application.

Param client

[in] Identifies the client for which the event is notified.

Param evt

[in] Event description along with result and associated parameters (if any).

Enums

enum mqtt_evt_type

MQTT Asynchronous Events notified to the application from the module through the callback registered by the application.

Values:

enumerator MQTT_EVT_CONNACK

Acknowledgment of connection request. Event result accompanying the event indicates whether the connection failed or succeeded.

enumerator MQTT_EVT_DISCONNECT

Disconnection Event. MQTT Client Reference is no longer valid once this event is received for the client.

enumerator MQTT_EVT_PUBLISH

Publish event received when message is published on a topic client is subscribed to.

Note

PUBLISH event structure only contains payload size, the payload data parameter should be ignored. Payload content has to be read manually with mqtt_read_publish_payload function.

enumerator MQTT_EVT_PUBACK

Acknowledgment for published message with QoS 1.

enumerator MQTT_EVT_PUBREC

Reception confirmation for published message with QoS 2.

enumerator MQTT_EVT_PUBREL

Release of published message with QoS 2.

enumerator MQTT_EVT_PUBCOMP

Confirmation to a publish release message with QoS 2.

enumerator MQTT_EVT_SUBACK

Acknowledgment to a subscribe request.

enumerator MQTT_EVT_UNSUBACK

Acknowledgment to a unsubscribe request.

enumerator MQTT_EVT_PINGRESP

Ping Response from server.

enum mqtt_version

MQTT version protocol level.

Values:

enumerator MQTT_VERSION_3_1_0 = 3

Protocol level for 3.1.0.

enumerator MQTT_VERSION_3_1_1 = 4

Protocol level for 3.1.1.

enum mqtt_qos

MQTT Quality of Service types.

Values:

enumerator MQTT_QOS_0_AT_MOST_ONCE = 0x00

Lowest Quality of Service, no acknowledgment needed for published message.

enumerator MQTT_QOS_1_AT_LEAST_ONCE = 0x01

Medium Quality of Service, if acknowledgment expected for published message, duplicate messages permitted.

enumerator MQTT_QOS_2_EXACTLY_ONCE = 0x02

Highest Quality of Service, acknowledgment expected and message shall be published only once. Message not published to interested parties unless client issues a PUBREL.

enum mqtt_conn_return_code

MQTT CONNACK return codes.

Values:

enumerator MQTT_CONNECTION_ACCEPTED = 0x00

Connection accepted.

enumerator MQTT_UNACCEPTABLE_PROTOCOL_VERSION = 0x01

The Server does not support the level of the MQTT protocol requested by the Client.

enumerator MQTT_IDENTIFIER_REJECTED = 0x02

The Client identifier is correct UTF-8 but not allowed by the Server.

enumerator MQTT_SERVER_UNAVAILABLE = 0x03

The Network Connection has been made but the MQTT service is unavailable.

enumerator MQTT_BAD_USER_NAME_OR_PASSWORD = 0x04

The data in the user name or password is malformed.

enumerator MQTT_NOT_AUTHORIZED = 0x05

The Client is not authorized to connect.

enum mqtt_suback_return_code

MQTT SUBACK return codes.

Values:

enumerator MQTT_SUBACK_SUCCESS_QoS_0 = 0x00

Subscription with QoS 0 succeeded.

enumerator MQTT_SUBACK_SUCCESS_QoS_1 = 0x01

Subscription with QoS 1 succeeded.

enumerator MQTT_SUBACK_SUCCESS_QoS_2 = 0x02

Subscription with QoS 2 succeeded.

enumerator MQTT_SUBACK_FAILURE = 0x80

Subscription for a topic failed.

enum mqtt_transport_type

MQTT transport type.

Values:

enumerator MQTT_TRANSPORT_NON_SECURE

Use non secure TCP transport for MQTT connection.

enumerator MQTT_TRANSPORT_NUM

Shall not be used as a transport type. Indicator of maximum transport types possible.

Functions

void mqtt_client_init(struct mqtt_client *client)

Initializes the client instance.

Note

Shall be called to initialize client structure, before setting any client parameters and before connecting to broker.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

int mqtt_connect(struct mqtt_client *client)

API to request new MQTT client connection.

Note

This memory is assumed to be resident until mqtt_disconnect is called.

Note

Any subsequent changes to parameters like broker address, user name, device id, etc. have no effect once MQTT connection is established.

Note

Default protocol revision used for connection request is 3.1.1. Please set client.protocol_version = MQTT_VERSION_3_1_0 to use protocol 3.1.0.

Note

Please modify CONFIG_MQTT_KEEPALIVE time to override default of 1 minute.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_publish(struct mqtt_client *client, const struct mqtt_publish_param *param)

API to publish messages on topics.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Parameters to be used for the publish message. Shall not be NULL.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_publish_qos1_ack(struct mqtt_client *client, const struct mqtt_puback_param *param)

API used by client to send acknowledgment on receiving QoS1 publish message. Should be called on reception of MQTT_EVT_PUBLISH with QoS level MQTT_QOS_1_AT_LEAST_ONCE.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Identifies message being acknowledged.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_publish_qos2_receive(struct mqtt_client *client, const struct mqtt_pubrec_param *param)

API used by client to send acknowledgment on receiving QoS2 publish message. Should be called on reception of MQTT_EVT_PUBLISH with QoS level MQTT_QOS_2_EXACTLY_ONCE.

Parameters
  • client[in] Identifies client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Identifies message being acknowledged.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_publish_qos2_release(struct mqtt_client *client, const struct mqtt_pubrel_param *param)

API used by client to request release of QoS2 publish message. Should be called on reception of MQTT_EVT_PUBREC.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Identifies message being released.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_publish_qos2_complete(struct mqtt_client *client, const struct mqtt_pubcomp_param *param)

API used by client to send acknowledgment on receiving QoS2 publish release message. Should be called on reception of MQTT_EVT_PUBREL.

Parameters
  • client[in] Identifies client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Identifies message being completed.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_subscribe(struct mqtt_client *client, const struct mqtt_subscription_list *param)

API to request subscription of one or more topics on the connection.

Parameters
  • client[in] Identifies client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Subscription parameters. Shall not be NULL.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_unsubscribe(struct mqtt_client *client, const struct mqtt_subscription_list *param)

API to request unsubscription of one or more topics on the connection.

Note

QoS included in topic description is unused in this API.

Parameters
  • client[in] Identifies client instance for which the procedure is requested. Shall not be NULL.

  • param[in] Parameters describing topics being unsubscribed from. Shall not be NULL.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_ping(struct mqtt_client *client)

API to send MQTT ping. The use of this API is optional, as the library handles the connection keep-alive on it’s own, see mqtt_live.

Parameters
  • client[in] Identifies client instance for which procedure is requested.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_disconnect(struct mqtt_client *client)

API to disconnect MQTT connection.

Parameters
  • client[in] Identifies client instance for which procedure is requested.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_abort(struct mqtt_client *client)

API to abort MQTT connection. This will close the corresponding transport without closing the connection gracefully at the MQTT level (with disconnect message).

Parameters
  • client[in] Identifies client instance for which procedure is requested.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_live(struct mqtt_client *client)

This API should be called periodically for the client to be able to keep the connection alive by sending Ping Requests if need be.

Note

Application shall ensure that the periodicity of calling this function makes it possible to respect the Keep Alive time agreed with the broker on connection. mqtt_connect for details on Keep Alive time.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_keepalive_time_left(const struct mqtt_client *client)

Helper function to determine when next keep alive message should be sent. Can be used for instance as a source for poll timeout.

Parameters
  • client[in] Client instance for which the procedure is requested.

Returns

Time in milliseconds until next keep alive message is expected to be sent. Function will return -1 if keep alive messages are not enabled.

int mqtt_input(struct mqtt_client *client)

Receive an incoming MQTT packet. The registered callback will be called with the packet content.

Note

In case of PUBLISH message, the payload has to be read separately with mqtt_read_publish_payload function. The size of the payload to read is provided in the publish event structure.

Note

This is a non-blocking call.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

Returns

0 or a negative error code (errno.h) indicating reason of failure.

int mqtt_read_publish_payload(struct mqtt_client *client, void *buffer, size_t length)

Read the payload of the received PUBLISH message. This function should be called within the MQTT event handler, when MQTT PUBLISH message is notified.

Note

This is a non-blocking call.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

  • buffer[out] Buffer where payload should be stored.

  • length[in] Length of the buffer, in bytes.

Returns

Number of bytes read or a negative error code (errno.h) indicating reason of failure.

int mqtt_read_publish_payload_blocking(struct mqtt_client *client, void *buffer, size_t length)

Blocking version of mqtt_read_publish_payload function.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

  • buffer[out] Buffer where payload should be stored.

  • length[in] Length of the buffer, in bytes.

Returns

Number of bytes read or a negative error code (errno.h) indicating reason of failure.

int mqtt_readall_publish_payload(struct mqtt_client *client, uint8_t *buffer, size_t length)

Blocking version of mqtt_read_publish_payload function which runs until the required number of bytes are read.

Parameters
  • client[in] Client instance for which the procedure is requested. Shall not be NULL.

  • buffer[out] Buffer where payload should be stored.

  • length[in] Number of bytes to read.

Returns

0 if success, otherwise a negative error code (errno.h) indicating reason of failure.

struct mqtt_utf8
#include <mqtt.h>

Abstracts UTF-8 encoded strings.

Public Members

const uint8_t *utf8

Pointer to UTF-8 string.

uint32_t size

Size of UTF string, in bytes.

struct mqtt_binstr
#include <mqtt.h>

Abstracts binary strings.

Public Members

uint8_t *data

Pointer to binary stream.

uint32_t len

Length of binary stream.

struct mqtt_topic
#include <mqtt.h>

Abstracts MQTT UTF-8 encoded topic that can be subscribed to or published.

Public Members

struct mqtt_utf8 topic

Topic on to be published or subscribed to.

uint8_t qos

Quality of service requested for the subscription. mqtt_qos for details.

struct mqtt_publish_message
#include <mqtt.h>

Parameters for a publish message.

Public Members

struct mqtt_topic topic

Topic on which data was published.

struct mqtt_binstr payload

Payload on the topic published.

struct mqtt_connack_param
#include <mqtt.h>

Parameters for a connection acknowledgment (CONNACK).

Public Members

uint8_t session_present_flag

The Session Present flag enables a Client to establish whether the Client and Server have a consistent view about whether there is already stored Session state.

enum mqtt_conn_return_code return_code

The appropriate non-zero Connect return code indicates if the Server is unable to process a connection request for some reason.

struct mqtt_puback_param
#include <mqtt.h>

Parameters for MQTT publish acknowledgment (PUBACK).

struct mqtt_pubrec_param
#include <mqtt.h>

Parameters for MQTT publish receive (PUBREC).

struct mqtt_pubrel_param
#include <mqtt.h>

Parameters for MQTT publish release (PUBREL).

struct mqtt_pubcomp_param
#include <mqtt.h>

Parameters for MQTT publish complete (PUBCOMP).

struct mqtt_suback_param
#include <mqtt.h>

Parameters for MQTT subscription acknowledgment (SUBACK).

struct mqtt_unsuback_param
#include <mqtt.h>

Parameters for MQTT unsubscribe acknowledgment (UNSUBACK).

struct mqtt_publish_param
#include <mqtt.h>

Parameters for a publish message.

Public Members

struct mqtt_publish_message message

Messages including topic, QoS and its payload (if any) to be published.

uint16_t message_id

Message id used for the publish message. Redundant for QoS 0.

uint8_t dup_flag

Duplicate flag. If 1, it indicates the message is being retransmitted. Has no meaning with QoS 0.

uint8_t retain_flag

Retain flag. If 1, the message shall be stored persistently by the broker.

struct mqtt_subscription_list
#include <mqtt.h>

List of topics in a subscription request.

Public Members

struct mqtt_topic *list

Array containing topics along with QoS for each.

uint16_t list_count

Number of topics in the subscription list

uint16_t message_id

Message id used to identify subscription request.

union mqtt_evt_param
#include <mqtt.h>

Defines event parameters notified along with asynchronous events to the application.

Public Members

struct mqtt_connack_param connack

Parameters accompanying MQTT_EVT_CONNACK event.

struct mqtt_publish_param publish

Parameters accompanying MQTT_EVT_PUBLISH event.

Note

PUBLISH event structure only contains payload size, the payload data parameter should be ignored. Payload content has to be read manually with mqtt_read_publish_payload function.

struct mqtt_puback_param puback

Parameters accompanying MQTT_EVT_PUBACK event.

struct mqtt_pubrec_param pubrec

Parameters accompanying MQTT_EVT_PUBREC event.

struct mqtt_pubrel_param pubrel

Parameters accompanying MQTT_EVT_PUBREL event.

struct mqtt_pubcomp_param pubcomp

Parameters accompanying MQTT_EVT_PUBCOMP event.

struct mqtt_suback_param suback

Parameters accompanying MQTT_EVT_SUBACK event.

struct mqtt_unsuback_param unsuback

Parameters accompanying MQTT_EVT_UNSUBACK event.

struct mqtt_evt
#include <mqtt.h>

Defines MQTT asynchronous event notified to the application.

Public Members

enum mqtt_evt_type type

Identifies the event.

union mqtt_evt_param param

Contains parameters (if any) accompanying the event.

int result

Event result. 0 or a negative error code (errno.h) indicating reason of failure.

struct mqtt_sec_config
#include <mqtt.h>

TLS configuration for secure MQTT transports.

Public Members

int peer_verify

Indicates the preference for peer verification.

uint32_t cipher_count

Indicates the number of entries in the cipher list.

int *cipher_list

Indicates the list of ciphers to be used for the session. May be NULL to use the default ciphers.

uint32_t sec_tag_count

Indicates the number of entries in the sec tag list.

sec_tag_t *sec_tag_list

Indicates the list of security tags to be used for the session.

int session_cache

Indicates the preference for enabling TLS session caching.

const char *hostname

Peer hostname for ceritificate verification. May be NULL to skip hostname verification.

int cert_nocopy

Indicates the preference for copying certificates to the heap.

struct mqtt_transport
#include <mqtt.h>

MQTT transport specific data.

Public Members

enum mqtt_transport_type type

Transport type selection for client instance. mqtt_transport_type for possible values. MQTT_TRANSPORT_MAX is not a valid type.

int sock

Socket descriptor.

struct mqtt_internal
#include <mqtt.h>

MQTT internal state.

Public Members

struct sys_mutex mutex

Internal. Mutex to protect access to the client instance.

uint32_t last_activity

Internal. Wall clock value (in milliseconds) of the last activity that occurred. Needed for periodic PING.

uint32_t state

Internal. Client’s state in the connection.

uint32_t rx_buf_datalen

Internal. Packet length read so far.

uint32_t remaining_payload

Internal. Remaining payload length to read.

struct mqtt_client
#include <mqtt.h>

MQTT Client definition to maintain information relevant to the client.

Public Members

struct mqtt_internal internal

MQTT client internal state.

struct mqtt_transport transport

MQTT transport configuration and data.

struct mqtt_utf8 client_id

Unique client identification to be used for the connection.

const void *broker

Broker details, for example, address, port. Address type should be compatible with transport used.

struct mqtt_utf8 *user_name

User name (if any) to be used for the connection. NULL indicates no user name.

struct mqtt_utf8 *password

Password (if any) to be used for the connection. Note that if password is provided, user name shall also be provided. NULL indicates no password.

struct mqtt_topic *will_topic

Will topic and QoS. Can be NULL.

struct mqtt_utf8 *will_message

Will message. Can be NULL. Non NULL value valid only if will topic is not NULL.

mqtt_evt_cb_t evt_cb

Application callback registered with the module to get MQTT events.

uint8_t *rx_buf

Receive buffer used for MQTT packet reception in RX path.

uint32_t rx_buf_size

Size of receive buffer.

uint8_t *tx_buf

Transmit buffer used for creating MQTT packet in TX path.

uint32_t tx_buf_size

Size of transmit buffer.

uint16_t keepalive

Keepalive interval for this client in seconds. Default is CONFIG_MQTT_KEEPALIVE.

uint8_t protocol_version

MQTT protocol version.

int8_t unacked_ping

Unanswered PINGREQ count on this connection.

uint8_t will_retain

Will retain flag, 1 if will message shall be retained persistently.

uint8_t clean_session

Clean session flag indicating a fresh (1) or a retained session (0). Default is CONFIG_MQTT_CLEAN_SESSION.