In the first three articles in this series I covered the topics of getting a 6LoWPAN wireless to work on Raspberry Pi's, to making IoT resources available on the IPv6 network and the CoAP protocol used for exchanging information between sensors/actuators and clients.
In this final article I look at two more topics addressed by CoAP, and that is how clients can locate CoAP endpoints using a directory service, and how clients can register themselves with an endpoint to have changes in the sensors such as a change of temperature published to the clients.
in this section I describe the proposed resource directory for CoAP. It is an IETF draft which means it might change, but probably not significantly.
I have left a hole so far: the sensors (or actuators)
boot up and courtesy of the radvd
daemon
assign themselves a routable IPv6 address, but no-one
else knows what that address is. I know what the address is for my RPi's,
because I hook up a keyboard and monitor and do an
ifconfig
query.
Then I can use that address elsewhere.
But if it is a random sensor or actuator, I am unlikely to be able to do that.
The IETF draft CoRE Resource Directory (draft-ietf-core-resource-directory-09) describes a directory system for registering and looking up sensors and actuators. This describes how to find directories, how to register resources with them and how to use them to lookup and discover resources.
The gateway we set up in the last article would be an obvious place to store such a directory since it is already on the global internet with a fixed routable IPv6 address, and this is one of the ways that the draft recommends. We will assume the gateway is also the resource directory.
Since the gateway is a CoAP node, it will have a list
of its own resources in the URL
/.well-known/core
(often just called WK/C).
What it does as a
directory is to add additional resources with the
following types
core.rd
: this type is for the
resource directory (RD) itself. Servers (sensors, actuators, etc)
register with this resource
core.rd-lookup
: this type of resource is
used by clients to find the registered resources.
core.rd-group
:
this tye of resource can be used to group other
resources, such as all the sensors and actuators in a
car, or all of the temperature sensors in a building.
I won't discuss this one further.
The gateway adds these resources just like any others:
root.add_resource(('rd',), ResourceDirectory()) root.add_resource(('rd-lookup',), ResourceDirectoryLookup()) root.add_resource(('rd-group',), ResourceDirectoryGroup())where
ResourceDirectory
,
ResourceDirectoryLookup
and
ResourceDirectoryGroup
are three new
classes to perform the directory functions.
These have resource types rt
of corerd
, core.rd-lookup
and
core.rd-group
respectively
and /rd
, /rd-lookup
and
/rd-group
are the respective URL paths
to access them. I will discuss these classes later.
There is a simple way and a more complex way for an endpoint
to register its resources. The complex way is to lookup
WK/C to find the rd.core
URL and then POST
to this URL. The simpler way, which may reduce the processing load
on the endpoint, is just to make an empty POST to WK/C.
That's where I hit a problem with this: why is the POST empty? Why doesn't it contain the resource descriptions as payload? The request seems to be expected to be registered as an empty endpoint and return a reference to this empty endpoint. Following that, the RD is expected to make a GET query on the endpoint's WK/C (at some indeterminate future time) and presumably fill the empty directory value with its return list.
The spec isn't quite clear and seems a bit bizarre compared to the more complex method.
There are two ways that an endpoint server can register its resources with an RD. There is a 'simple' way and a more complex ome. I'm not completely happy with the simple way, and since we need the more complex one anyway, I'll just describe that.
An endpoint trying to register resources in the RD
will need to find what the registration URL is on the RD's server.
To do this it will query the RD's server .well-known/core
to get the URL for the resource type rd.core
.
A client wanting to do lookups will query the
RD's server .well-known/core
to get the URL for the resource type rd.core-lookup
.
Generic code to find these first gets the list of resources from the WK/C by
request = Message(code=GET) request.set_request_uri('coap://[fd28::1]/.well-known/core') # get the url path for the RD lookup resource try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1)
The format of the returned value is a list of URL links in CoRE Link Format as described in RFC 6690. These are best parsed using the LinkHeader Python package as follows:
link = response.payload.decode('utf-8') (rd_href, rd_lookup_href, rd_group_href) = \ get_resource_directory_urls(link_header.parse(link))where
def get_resource_directory_urls(links): rd_href = rd_lookup_href = rd_group_href = None for link in links.links: for (attr, value) in link.attr_pairs: if attr == 'rt': if value == 'core.rd': rd_href = link.href elif value == 'core.rd-lookup': rd_lookup_href = link.href elif value == 'core.rd-group': rd_group_href = link.hef return (rd_href, rd_lookup_href, rd_group_href)
An endpoint registers its resources to the RD by POST'ing
the resources to the RD's rd_href
.
These resources are the payload in Link Format.
In addition, generally two extra parameters are given
as request query parameters to the POST request
(this is unlike the practice recommended for HTML POST query strings).
The first of these parameters is an endpoint name such as
"RPi in my study" and the second is often the URL of
the endpoint to connect to, such as
coap://[fd28::2]
. The first is required
in the specification, to allow endpoints to be searched for by name.
The second is often advisable: the endpoint may be communicating
to the RD using a link-local address, and if it wants to be
visible from outside this local link, it will need
to have a unique local or global address. This parameter
allows this to be sent.
The POST will generally return a URL, which is the URL of the resource's registration on the RD. This can be used later if needed to update or delete the RD entry.
The code for this is
def register_resource(): context = yield from aiocoap.Context.create_client_context() payload = '</temperature>;' +\ 'rt="jan.newmarch:temperature-sensor";' +\ 'if="https://jan.newmarch.name/temperature-sensor"' request = aiocoap.Message(code=aiocoap.POST, payload=payload.encode('utf-8',)) request.set_request_uri('coap://[fd28::1]/rd?ep="RPi in my study"&con="coap://[fd28::2]"') # create the new resource on the directory response = yield from context.request(request).response created_uri = str(response.payload.decode('utf-8'))
The URL returned from registering the endpoint can be used to update or delete the endpoint's services. We omit this code, except to say that the update is performed using a CoAP POST (it should really be PUT in my opinion, but PUT vs POST is a debatable issue in REST) and the delete is by a CoAP DELETE.
We can probably assume that a client trying to locate
resources, and the gateway acting as directory,
will have enough compute resources to engage in
complex dialogues. The client trying to locate
resources will need the gateway's address (assumed to be
known somehow) and will then use the WK/C to find the URL
of the
directory lookup (core.rd-lookup
).
The RD has some flexibility in the lookup
requests it can manage. It can handle lookups by endpoint name,
by endpoint group, by resource, ...
I will only consider by resource. The RD lookup's URL path
will be something like /rd-lookup
, as obtained earlier.
The resource lookup path will have /res
appended: /rd-lookup/res
.
A client can then ask for all resources known to this directory by a GET request:
GET coap://[fd28::1]/rd-lookup/reswith response such as
Res: 2.05 Content <coap://[FD28::2]/temperature>;rt="jan.newmarch:temperature-sensor"; if="https://jan.newmarch.name/temperature-sensor"
The additional code for the client
to find what resources are in the directory, to search for a
specific type (e.g. jan.newmarch:temperature-sensor
)
and then get the temperature
looks like this
# Find the remote resources in the RD request = Message(code=GET) request.set_request_uri('coap://[fd28::1]' + rd_lookup_href + '/res') try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1) # now find a temperature sensor links = link_header.parse(response.payload.decode('utf-8')) for link in links.links: for (tag, value) in link.attr_pairs: if tag == 'rt' and value == 'jan.newmarch:temperature-sensor': # we've got a temp sensor, look it up href = link.href request = Message(code=GET) request.set_request_uri(href) try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1) print('Result: %s\n%r'%(response.code, cbor.loads(response.payload)))This should produce something like
Result: 2.05 Content {'unit': 'C', 'temperature': '36.3'}
The RD on a server will have the three resources for core, lookup and group. We have only talked about the core and lookup and continue to ignore the group. To implement the server has required defining classes for these and registering them on the server.
But lookups can be by resource, by endpoint and several other options.
These each have their own resources on the server, such as
/rd-lookup/res
for lookups by resource.
The implementation will need classes for all of these:
ResourceDirectory
This class must handle registration requests.
It needs to respond to POST requests, and will
need to store the registration information in a directory list.
The registration information is stored in a class
RemoteResource
with attributes
remote_host
used as a key in the directory,
the endpoint name
and the payload
of the resource description in Link Format.
The remote resource class is
class RemoteResource(resource.Resource): def __init__(self, request): super(RemoteResource, self).__init__() # We use this as the key in a map to this object self.remote_host = request.remote.sockaddr[0] # Get query options self.url = self.ep = None for query_elmt in request.opt.uri_query: [tag, value] = query_elmt.split("=") if tag == 'con': self.url = value elif tag == 'ep': self.ep = value # Fix up url if it wasn't set by 'con' option if self.url == None: port = request.opt.uri_port if request.opt.uri_port else DEFAULT_PORT url = 'coap:[' + self.remote_host + ']:' + str(port) # Assume the format is utf-8 payload = request.payload.decode('utf-8') # now change the relative URLs to absolute URLs # using LinkHeader to manage the links links = link_header.parse(payload) for link in links.links: # lose " from string - why do they come in? link.href = self.url.strip('"') + link.href # convert back to string self.payload = str(links)
The ResourceDirectory
then takes POST
requests, creates a RemoteResource
and adds it to a dictionary keyed under the remote IP address:
class ResourceDirectory(resource.Resource): def __init__(self): super(ResourceDirectory, self).__init__() self.rt = 'core.rd' self.directory = {} def render_post(self, request): global root rr = RemoteResource(request) # add the resource with URL /rd/rr.host root.add_resource(('rd',rr.remote_host), rr) # add to our directory set self.directory[rr.host] = rr # return URL for updates to registering agent content = '/rd/' + rr.remote_host payload = content.encode('utf8') return aiocoap.Message(code=aiocoap.CHANGED, payload=payload) mesg.opt.content_format = CONTENT_FORMAT_CORE return mesg
ResourceDirectoryLookup
class ResourceDirectoryLookup(resource.Resource): def __init__(self, resource_directory): super(ResourceDirectoryLookup, self).__init__() self.rt = 'core.rd-lookup' self.resource_directory = resource_directory
ResourceDirectoryLookupResources
class ResourceDirectoryLookupResources(resource.Resource): def __init__(self, resource_directory): super(ResourceDirectoryLookupResources, self).__init__() self.resource_directory = resource_directory def render_get(self, request): payload = ''.encode('utf-8') for _, rr in self.resource_directory.directory.items(): payload += (rr.payload + ',').encode('utf-8') # lose trailing ',' payload = payload[:-1] # return the CBOR representation of the resource mesg = aiocoap.Message(code=aiocoap.CONTENT, payload=payload) mesg.opt.content_format = CONTENT_FORMAT_CORE return mesg
Installing all of these is done in the main
routine by
root.add_resource(('.well-known', 'core'), resource.WKCResource(root.get_resources_as_linkheader)) resource_directory = ResourceDirectory() root.add_resource(('rd',), resource_directory) root.add_resource(('rd-lookup',), ResourceDirectoryLookup(resource_directory)) root.add_resource(('rd-lookup','res'), ResourceDirectoryLookupResources(resource_directory))
The complete server is directory.py
#!/usr/bin/env python3 import asyncio import aiocoap.resource as resource import aiocoap import re import cbor import link_header CONTENT_FORMAT_CORE = 40 DEFAULT_PORT = 5683 global root class RemoteResource(resource.Resource): def __init__(self, request): super(RemoteResource, self).__init__() print('RemoteResource init-ing with request ', request) print('Remote is ', request.remote.sockaddr[0]) # We use this as the key in a map to this object self.remote_host = request.remote.sockaddr[0] self.host = request.opt.uri_host print('Host key is ', self.host) # Check if 'con' option is set self.url = self.ep = self.d = self.et = self.lt = None for query_elmt in request.opt.uri_query: [tag, value] = query_elmt.split("=") print(' tag %s value %s)' % (tag, value)) if tag == 'con': self.url = value elif tag == 'ep': self.ep = value elif tag == 'd': self.d = value elif tag == 'et': self.et = value elif tag == 'lt': self.lt = value # Fix up url if it wasn't set by 'con' option if self.url == None: port = request.opt.uri_port if request.opt.uri_port else DEFAULT_PORT url = 'coap:[' + self.remote_host + ']:' + str(port) # now setup the links, with non-local url if request.opt.content_format == 0: # utf-8 payload = request.payload.decode('utf-8') elif request.opt.content_format == 60: # CBOR payload = cbor.loads(request.payload) else: # Duh repeats 0 payload = request.payload.decode('utf-8') links = link_header.parse(payload) for link in links.links: # lose " from string - why do they come in? link.href = self.url.strip('"') + link.href print('Full url is now %s' % (link.href)) # capture the CBOR representation of the resource self.payload = str(links) print('End of RemoteResource init') class ResourceDirectory(resource.Resource): def __init__(self): super(ResourceDirectory, self).__init__() self.rt = 'core.rd' self.directory = {} def render_post(self, request): global root rr = RemoteResource(request) # add the resource with URL /rd/rr.host root.add_resource(('rd',rr.remote_host), rr) # add to our directory set self.directory[rr.host] = rr # return URL for updates to registering agent content = '/rd/' + rr.remote_host payload = content.encode('utf8') return aiocoap.Message(code=aiocoap.CHANGED, payload=payload) mesg.opt.content_format = CONTENT_FORMAT_CORE return mesg class ResourceDirectoryLookup(resource.Resource): def __init__(self, resource_directory): super(ResourceDirectoryLookup, self).__init__() self.rt = 'core.rd-lookup' self.resource_directory = resource_directory class ResourceDirectoryLookupResources(resource.Resource): def __init__(self, resource_directory): super(ResourceDirectoryLookupResources, self).__init__() self.resource_directory = resource_directory def render_get(self, request): payload = ''.encode('utf-8') for _, rr in self.resource_directory.directory.items(): payload += (rr.payload + ',').encode('utf-8') # lose trailing ',' payload = payload[:-1] # return the CBOR representation of the resource mesg = aiocoap.Message(code=aiocoap.CONTENT, payload=payload) mesg.opt.content_format = CONTENT_FORMAT_CORE return mesg def main(): # Resource tree creation global root root = resource.Site() root.add_resource(('.well-known', 'core'), resource.WKCResource(root.get_resources_as_linkheader)) resource_directory = ResourceDirectory() root.add_resource(('rd',), resource_directory) # There should be separate lookup types for 'd', 'ep', 'res' and 'gp' # We only do 'rd-lookup' (with no methods) to appear in # the WK/C list and 'rd-lookup/res' as the one with a GET method root.add_resource(('rd-lookup',), ResourceDirectoryLookup(resource_directory)) root.add_resource(('rd-lookup','res'), ResourceDirectoryLookupResources(resource_directory), True) # There should be a group management handler # root.add_resource(('rd-group',), ResourceDirectoryGroup()) asyncio.async(aiocoap.Context.create_server_context(root)) asyncio.get_event_loop().run_forever() if __name__ == "__main__": main()
A client has to do the work of getting the directory, sorting through it for what it wants, and then calling the appropriate client. It is directory_client.py
#!/usr/bin/env python3 import asyncio from aiocoap import * import link_header import sys import cbor def get_resource_directory_urls(links): print("Printing links\n") rd_href = rd_lookup_href = rd_group_href = None for link in links.links: print("one link is ", link) print('href: ', link.href) for (attr, value) in link.attr_pairs: # print('attr ', attr, ' value ', value) if attr == 'rt': if value == 'core.rd': rd_href = link.href elif value == 'core.rd-lookup': rd_lookup_href = link.href elif value == 'core.rd-group': rd_group_href = link.hef return (rd_href, rd_lookup_href, rd_group_href) @asyncio.coroutine def main(): protocol = yield from Context.create_client_context() request = Message(code=GET) request.set_request_uri('coap://[fd28::1]/.well-known/core') # get the url path for the RD lookup resource try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1) link = response.payload.decode('utf-8') print('WKC on dir is ', link) (rd_href, rd_lookup_href, rd_group_href) = \ get_resource_directory_urls(link_header.parse(link)) print('RD lookup is ', rd_lookup_href) # check the temp sensor is okay request = Message(code=GET) #request.set_request_uri('coap://[fd28::1]/.well-known/core?rt=rd.core') request.set_request_uri('coap://[fd28::2]/temperature') try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1) print('Result: %s\n%r'%(response.code, cbor.loads(response.payload))) # Find the remote resources in the RD request = Message(code=GET) request.set_request_uri('coap://[fd28::1]' + rd_lookup_href + '/res') try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1) print('Result: %s\n%r'%(response.code, response.payload.decode('utf-8'))) # now find a temperature sensor links = link_header.parse(response.payload.decode('utf-8')) for link in links.links: for (tag, value) in link.attr_pairs: if tag == 'rt' and value == 'jan.newmarch:temperature-sensor': # we've got a temp sensor, look it up href = link.href request = Message(code=GET) print('Request uri is "%s"' % href) request.set_request_uri(href) try: response = yield from protocol.request(request).response except Exception as e: print('Failed to fetch resource:', e) sys.exit(1) print('Result: %s\n%r'%(response.code, cbor.loads(response.payload))) if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())
What I've talked about by so far gets all the resources on an endpoint or gets all the endpoints on an RD. These may be very large numbers. An endpoint may support filtering to selectively get a subset. An RD is required to support filtering. That is, a client can ask for all temperature resources, or for all endpoints with a given name.
This filtering is done by adding URL query parameters such as
GET coap://[fd28::1]/rd-lookup/res?ep="KitchenTemperature"This just makes the code more complex without adding much to concepts, so I don't give it here.
In all that I have done so far, the clients have been the principal agents: finding resources, querying for resource types and asking for values. The servers themsleves have been co-operating in this, registering themselves with directories and putting type information in convenient places.
In this section I turn it around: the clients says to the resource that it should play the active role, informing the clients when a change occurs. This is like the publish-subscribe model of protocols like MQTT: the client subscribes to the resource and the resource publishes changes to all the clients.
The CoAP terminology for this is that such resources are observable. The formal specification is RFC7641 "Observing Resources in the Constrained Application Protocol (CoAP)"
The essential changes to the CoAP model discussed earlier are
observe
)
to subscribe to resource changes if they are
supported
The aiocoap
library makes it pretty easy
to become an observable resource. Instead of inheriting
from the class resource.Resource
, the
resource inherits from resource.ObservableResource
.
This looks after the details of
handling the GET request and
sending multple GET responses.
What the resource has to do is to monitor its own changes
and decide when to tell the subscribers.
The simplest way to do this for the resource to sit in a
loop monitoring its state at regular intervals. Thsi can be
done within the asyncio
package by the call
asyncio.get_event_loop().call_later(5, self.notify)
which will schedule a call to notify()
every 5 seconds.
The notify
method signals that a change
should be published by calling the
ObservableResource
method updated_state
.
This then sends a GET reply to all subscribers using the
already defined render_get
method.
For the temperature sensor, a slight re-arrangment of code to reflect this is in the program cbor_coap_observable_server.py
import asyncio import aiocoap.resource as resource from aiocoap.resource import ObservableResource import aiocoap import re import cbor CONTENT_FORMAT_CBOR = 60 CONTENT_FORMAT_CORE = 40 from subprocess import PIPE, Popen class TemperatureResource(resource.ObservableResource): def __init__(self): super(TemperatureResource, self).__init__() self.temp = {'temperature': None, 'units': None} self.notify() # start monitoring resource state def current_temperature(self): process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE) output, _error = process.communicate() list = re.split("[='\n]", output.decode('utf-8')) dict = {'temperature' : float(list[1]), 'unit' : list[2]} return dict def temperature_changed(self, new_temp): if (new_temp['temperature'] != self.temp['temperature']): return True if (new_temp['unit'] != self.temp['unit']): return True return False def notify(self): new_temp = self.current_temperature() if (self.temperature_changed(new_temp)): self.temp = new_temp self.updated_state() # inform subscribers else: print('No temp change') # check again after 5 seconds asyncio.get_event_loop().call_later(5, self.notify) @asyncio.coroutine def render_get(self, request): print('Render get requested') mesg = aiocoap.Message(code=aiocoap.CONTENT, payload=cbor.dumps(self.temp)) mesg.opt.content_format = CONTENT_FORMAT_CBOR return mesg def main(): # Resource tree creation root = resource.Site() root.add_resource(('temperature',), TemperatureResource()) asyncio.async(aiocoap.Context.create_server_context(root)) asyncio.get_event_loop().run_forever() if __name__ == "__main__": main()
It's not so straightforward getting a client to work.
The aiocoap
package you would have
downloaded for the previous article was designed for
Python 3.4, and that works fine on the RPi.
But there isn't a simple example of a client for an
observable resource and the one given in the
tests directory is so arcane that I haven't been able to get it to
work by itself.
Just in the last few weeks though (at the time of writing)
the aiocoap
package has been upgraded
to Python 3.5, and there is a nice simple example
of a client, using the new async def
statement. That's good for distros such as Ubuntu which are up
to Python 3.5, but not for the RPi which is still on
Python 3.4.
In the expectation that you are more likely to run this client on another non-RPi system and that the RPi will be upgraded to 3.5 sometime anyway, I will use the 3.5 example. [footnote: you can build Python 3.5 on the RPi yourself anyway. A Google search will turn up instructions.]
Download and install the latest aiocoap
package to 3.5 this time, not 3.4
git clone --depth=1 https://github.com/chrysn/aiocoap.git cd aiocoap/ sudo mv aiocoap /usr/lib/python3.5
The client sets up a message request as before
but with an additional observe
flag,
and creates a protocol request object.
In a normal GET, it ask for a single response
from the protocol request. Now it asks for a set of
responses from a the protocol request's
observation
. The code is
cbor_coap_observable_client.py:
#!/usr/bin/env python3 import asyncio import cbor from aiocoap import * CONTENT_FORMAT_CBOR = 60 async def main(): protocol = await Context.create_client_context() request = Message(code=GET, uri='coap://[fd28::2]/temperature', observe=0) pr = protocol.request(request) # Note that it is necessary to start sending r = await pr.response print("Initial temperature: %r"%(cbor.loads(r.payload))) async for r in pr.observation: if (r.opt.content_format == CONTENT_FORMAT_CBOR): print("New temperature: %r"%(cbor.loads(r.payload))) else: print('Unknown format') if __name__ == "__main__": asyncio.get_event_loop().run_until_complete(main())
This series has addressed the issues of setting up a 6LoWPAN low power wireless network, using the OpenLabs radios on Raspberry Pi's, followed by bringing these devices into internet visibility, and in this article looking at data formats and protocols for the IoT.
There have been many topics omitted: the major one is that of security, as the system we have described is wide open to snooping and hacking. The security mechanisms are all there, but are a full topic in their own right.
The programs in this series are all on my web site https://jan.newmarch.name/IoT/LinuxJournal/programs.zip