# Copyright (c) 2012-2014 Roger Light <roger@atchoo.org>
#
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Eclipse Public License v1.0
# and Eclipse Distribution License v1.0 which accompany this distribution.
#
# The Eclipse Public License is available at
# http://www.eclipse.org/legal/epl-v10.html
# and the Eclipse Distribution License is available at
# http://www.eclipse.org/org/documents/edl-v10.php.
#
# Contributors:
# Roger Light - initial API and implementation
"""
This is an MQTT v3.1 client module. MQTT is a lightweight pub/sub messaging
protocol that is easy to implement and suitable for low powered devices.
"""
import collections
import errno
import os
import platform
import select
import socket
try:
import ssl
except ImportError:
ssl = None
import struct
import sys
import threading
import time
import uuid
import base64
import string
import hashlib
import logging
try:
# Use monotonic clock if available
time_func = time.monotonic
except AttributeError:
time_func = time.time
try:
import dns.resolver
except ImportError:
HAVE_DNS = False
else:
HAVE_DNS = True
from .matcher import MQTTMatcher
if platform.system() == 'Windows':
EAGAIN = errno.WSAEWOULDBLOCK
else:
EAGAIN = errno.EAGAIN
MQTTv31 = 3
MQTTv311 = 4
if sys.version_info[0] >= 3:
# define some alias for python2 compatibility
unicode = str
basestring = str
# Message types
CONNECT = 0x10
CONNACK = 0x20
PUBLISH = 0x30
PUBACK = 0x40
PUBREC = 0x50
PUBREL = 0x60
PUBCOMP = 0x70
SUBSCRIBE = 0x80
SUBACK = 0x90
UNSUBSCRIBE = 0xA0
UNSUBACK = 0xB0
PINGREQ = 0xC0
PINGRESP = 0xD0
DISCONNECT = 0xE0
# Log levels
MQTT_LOG_INFO = 0x01
MQTT_LOG_NOTICE = 0x02
MQTT_LOG_WARNING = 0x04
MQTT_LOG_ERR = 0x08
MQTT_LOG_DEBUG = 0x10
LOGGING_LEVEL = {
MQTT_LOG_DEBUG: logging.DEBUG,
MQTT_LOG_INFO: logging.INFO,
MQTT_LOG_NOTICE: logging.INFO, # This has no direct equivalent level
MQTT_LOG_WARNING: logging.WARNING,
MQTT_LOG_ERR: logging.ERROR,
}
# CONNACK codes
CONNACK_ACCEPTED = 0
CONNACK_REFUSED_PROTOCOL_VERSION = 1
CONNACK_REFUSED_IDENTIFIER_REJECTED = 2
CONNACK_REFUSED_SERVER_UNAVAILABLE = 3
CONNACK_REFUSED_BAD_USERNAME_PASSWORD = 4
CONNACK_REFUSED_NOT_AUTHORIZED = 5
# Connection state
mqtt_cs_new = 0
mqtt_cs_connected = 1
mqtt_cs_disconnecting = 2
mqtt_cs_connect_async = 3
# Message state
mqtt_ms_invalid = 0
mqtt_ms_publish = 1
mqtt_ms_wait_for_puback = 2
mqtt_ms_wait_for_pubrec = 3
mqtt_ms_resend_pubrel = 4
mqtt_ms_wait_for_pubrel = 5
mqtt_ms_resend_pubcomp = 6
mqtt_ms_wait_for_pubcomp = 7
mqtt_ms_send_pubrec = 8
mqtt_ms_queued = 9
# Error values
MQTT_ERR_AGAIN = -1
MQTT_ERR_SUCCESS = 0
MQTT_ERR_NOMEM = 1
MQTT_ERR_PROTOCOL = 2
MQTT_ERR_INVAL = 3
MQTT_ERR_NO_CONN = 4
MQTT_ERR_CONN_REFUSED = 5
MQTT_ERR_NOT_FOUND = 6
MQTT_ERR_CONN_LOST = 7
MQTT_ERR_TLS = 8
MQTT_ERR_PAYLOAD_SIZE = 9
MQTT_ERR_NOT_SUPPORTED = 10
MQTT_ERR_AUTH = 11
MQTT_ERR_ACL_DENIED = 12
MQTT_ERR_UNKNOWN = 13
MQTT_ERR_ERRNO = 14
MQTT_ERR_QUEUE_SIZE = 15
MQTT_CLIENT = 0
MQTT_BRIDGE = 1
sockpair_data = b"0"
class WebsocketConnectionError(ValueError):
pass
class WouldBlockError(Exception):
pass
def error_string(mqtt_errno):
"""Return the error string associated with an mqtt error number."""
if mqtt_errno == MQTT_ERR_SUCCESS:
return "No error."
elif mqtt_errno == MQTT_ERR_NOMEM:
return "Out of memory."
elif mqtt_errno == MQTT_ERR_PROTOCOL:
return "A network protocol error occurred when communicating with the broker."
elif mqtt_errno == MQTT_ERR_INVAL:
return "Invalid function arguments provided."
elif mqtt_errno == MQTT_ERR_NO_CONN:
return "The client is not currently connected."
elif mqtt_errno == MQTT_ERR_CONN_REFUSED:
return "The connection was refused."
elif mqtt_errno == MQTT_ERR_NOT_FOUND:
return "Message not found (internal error)."
elif mqtt_errno == MQTT_ERR_CONN_LOST:
return "The connection was lost."
elif mqtt_errno == MQTT_ERR_TLS:
return "A TLS error occurred."
elif mqtt_errno == MQTT_ERR_PAYLOAD_SIZE:
return "Payload too large."
elif mqtt_errno == MQTT_ERR_NOT_SUPPORTED:
return "This feature is not supported."
elif mqtt_errno == MQTT_ERR_AUTH:
return "Authorisation failed."
elif mqtt_errno == MQTT_ERR_ACL_DENIED:
return "Access denied by ACL."
elif mqtt_errno == MQTT_ERR_UNKNOWN:
return "Unknown error."
elif mqtt_errno == MQTT_ERR_ERRNO:
return "Error defined by errno."
else:
return "Unknown error."
def connack_string(connack_code):
"""Return the string associated with a CONNACK result."""
if connack_code == CONNACK_ACCEPTED:
return "Connection Accepted."
elif connack_code == CONNACK_REFUSED_PROTOCOL_VERSION:
return "Connection Refused: unacceptable protocol version."
elif connack_code == CONNACK_REFUSED_IDENTIFIER_REJECTED:
return "Connection Refused: identifier rejected."
elif connack_code == CONNACK_REFUSED_SERVER_UNAVAILABLE:
return "Connection Refused: broker unavailable."
elif connack_code == CONNACK_REFUSED_BAD_USERNAME_PASSWORD:
return "Connection Refused: bad user name or password."
elif connack_code == CONNACK_REFUSED_NOT_AUTHORIZED:
return "Connection Refused: not authorised."
else:
return "Connection Refused: unknown reason."
def base62(num, base=string.digits + string.ascii_letters, padding=1):
"""Convert a number to base-62 representation."""
assert num >= 0
digits = []
while num:
num, rest = divmod(num, 62)
digits.append(base[rest])
digits.extend(base[0] for _ in range(len(digits), padding))
return ''.join(reversed(digits))
def topic_matches_sub(sub, topic):
"""Check whether a topic matches a subscription.
For example:
foo/bar would match the subscription foo/# or +/bar
non/matching would not match the subscription non/+/+
"""
matcher = MQTTMatcher()
matcher[sub] = True
try:
next(matcher.iter_match(topic))
return True
except StopIteration:
return False
def _socketpair_compat():
"""TCP/IP socketpair including Windows support"""
listensock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
listensock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listensock.bind(("127.0.0.1", 0))
listensock.listen(1)
iface, port = listensock.getsockname()
sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_IP)
sock1.setblocking(0)
try:
sock1.connect(("127.0.0.1", port))
except socket.error as err:
if err.errno != errno.EINPROGRESS and err.errno != errno.EWOULDBLOCK and err.errno != EAGAIN:
raise
sock2, address = listensock.accept()
sock2.setblocking(0)
listensock.close()
return (sock1, sock2)
class MQTTMessageInfo(object):
"""This is a class returned from Client.publish() and can be used to find
out the mid of the message that was published, and to determine whether the
message has been published, and/or wait until it is published.
"""
__slots__ = 'mid', '_published', '_condition', 'rc', '_iterpos'
def __init__(self, mid):
self.mid = mid
self._published = False
self._condition = threading.Condition()
self.rc = 0
self._iterpos = 0
def __str__(self):
return str((self.rc, self.mid))
def __iter__(self):
self._iterpos = 0
return self
def __next__(self):
return self.next()
def next(self):
if self._iterpos == 0:
self._iterpos = 1
return self.rc
elif self._iterpos == 1:
self._iterpos = 2
return self.mid
else:
raise StopIteration
def __getitem__(self, index):
if index == 0:
return self.rc
elif index == 1:
return self.mid
else:
raise IndexError("index out of range")
def _set_as_published(self):
with self._condition:
self._published = True
self._condition.notify()
def wait_for_publish(self):
"""Block until the message associated with this object is published."""
if self.rc == MQTT_ERR_QUEUE_SIZE:
raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
with self._condition:
while not self._published:
self._condition.wait()
def is_published(self):
"""Returns True if the message associated with this object has been
published, else returns False."""
if self.rc == MQTT_ERR_QUEUE_SIZE:
raise ValueError('Message is not queued due to ERR_QUEUE_SIZE')
with self._condition:
return self._published
class MQTTMessage(object):
""" This is a class that describes an incoming or outgoing message. It is
passed to the on_message callback as the message parameter.
Members:
topic : String/bytes. topic that the message was published on.
payload : String/bytes the message payload.
qos : Integer. The message Quality of Service 0, 1 or 2.
retain : Boolean. If true, the message is a retained message and not fresh.
mid : Integer. The message id.
On Python 3, topic must be bytes.
"""
__slots__ = 'timestamp', 'state', 'dup', 'mid', '_topic', 'payload', 'qos', 'retain', 'info'
def __init__(self, mid=0, topic=b""):
self.timestamp = 0
self.state = mqtt_ms_invalid
Loading ...