Joost

Collecting data from Onion Omega via sse

a 9 minutes read, published on 15 June 2018

In this series I’ll explore ways to collect data from several OpenWRT based devices called Onion Omega. The data will come from sensors that are attached to these Onion Omegas. The little boards will make the sensor data available via Server-sent Events(SSE) in the default format.

SSE message format

In order to do anything with data, we need to collect it from somewhere first. In order to do this we’ll first create a script that collects data from a sensor which is connected to an Onion Omega to then make it available to any client that connects to the devices via SSE.

Lets try to figure out what a SSE stream should look like. The MIME type of SSE should be text/event-stream and the basic message format, when decoded as utf-8 looks something like

event: add
data: 73857293

event: message
data: This is the second message, it
data: has two lines.

event: remove
data: 113411

When no event type is specified it will be the default ‘message’.
For now I choose to make the events contain only one data line containing JSON which will look something like

event: message
data: {"someKey": "someValue"}

Now that we decided on a basic raw SSE data format, we should probably look at how to collect the data.

Collecting data

To make the data available we can implement a server to which clients can connect to request the data stream. For this we’ll use Node.js. The Onion Omega supports Node.js, we just need to install it via the package manager.

I started with a clean Onion Omega, factory reset by pressing the button 10 secs. After it boots connect to its Access Point and open http://omega-xxxx.local/ where xxxx is your omega. Login to the admin panel and add your home wifi settings and also enable it. When done you should be able to connect to your device over ssh via your computer (when on the same wifi).

On the Omega run the following commands to install Node.js and some additional tools

$ opkg update
$ opkg install nodejs
$ opkg install vim
$ opkg install wget

This can take a while and will take an additional 8mb on the Omega. This is quite a lot for the Omega as it has only 16mb storage in total (by default), so the next script will use as little as possible extra dependencies. As sensor we use a DHT11 sensor that can measure temperature and humidity and to support it we also need to install a ‘driver’ to read out the values. Lets make a project directory name sensor on the Omega and then run the following commands from this new directory

# to get the driver, originally made by Adafruit
$ wget https://community.onion.io/uploads/files/1450434316215-checkhumidity.tar.gz

# to untar the compressed package
$ tar -zxvf 1450434316215-checkhumidity.tar.gz

# to make the binary executable
$ chmod -R 755 checkHumidity/bin/checkHumidity

Then open up vim on the Omega

$ vim sse.js

And add the following code to it

var http = require('http');
var path = require('path');
var sync = require('child_process').spawnSync;
// We depend on a binary
var cmd = path.resolve(__dirname, './checkHumidity/bin/checkHumidity');
var connectionCounter = 1;

http.createServer(function (req, res) {

if (req.url === '/sse') {

var clientId = connectionCounter++;
var eventCounter = 1;

// Log information about the connecting client.
console.log('Client connected:');
console.log('\tconnection #' + clientId);
console.log('\tLast-Event-Id: ' + req.headers['last-event-id']);

// Tell clients the data should be interpreted as SSE stream
// and to not cache the data.
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache'
});

// Respond with an SSE event periodically to connected clients
var ticker = setInterval(function () {
res.write('event: message\n'); // default event type
res.write('id: ' + (clientId * 1000 + eventCounter) + '\n'); // id to track Last-Event-Id seen on client
res.write('data: ' + readSensor() + '\n\n'); // fake temperature range
eventCounter++
}, 1000);

// Stop sending SSE events when a client disconnects
req.on('close', function () {
console.log('Client disconnected from event stream (connection #' + clientId + ')');
res.end();
clearInterval(ticker);
});

} else {
res.writeHead(404);
res.end();
}

}).listen(8888);

function readSensor(){
return JSON.stringify({
date: new Date().toISOString(),
timestamp: Date.now(),
readings: sensorValue(),
units: ['%', '°C']
});
}

function sensorValue() {
// the DHT11 sensor is connected to PIN 19
return sync(cmd, ['19', 'DHT11']).stdout.split('\n');
}

function randomValue(min, max) {
return Math.floor(Math.random() * (max - min + 1) + min);
}

This simple sse server can be run with node sse.js on the omega and then with curl
we can connect to the sse server with curl -H "Accept: text/event-stream" http://omega-xxxx.local:8888/sse -vvvv
which will output something like:

* Connected to omega-1e13.local (192.168.178.30) port 8888 (#0)
> GET /events HTTP/1.1
> Host: omega-1e13.local:8888
> User-Agent: curl/7.43.0
> Accept: text/event-stream
>
< HTTP/1.1 200 OK
< Content-Type: text/event-stream
< Cache-Control: no-cache
< Date: Sat, 10 Sep 2016 04:32:33 GMT
< Connection: keep-alive
< Transfer-Encoding: chunked
<
event: message
id: 1074
data: {"date":"2016-09-11T11:30:25.217Z","reading":-4,"unit":"°C"}

Now that we have some data on the wire, we can create an intermediate service that serves as a proxy to the data the Omega gathers for us.

To make this script run whenever we (re)boot the omega, we can add one line to /etc/rc.local

# Put your custom commands here that should be executed once
# the system init finished. By default this file does nothing.

node /root/sse.js
exit 0

SSE client proxy

When we could have multiple data sources in the future, it might be a good idea to create a proxy that would make it easier to collect or monitor all changes. You wouldn’t want to connect yourself to each - in our case Omega - device that hosts a sensor.

var http = require('http');
var URL = require('url');
var EventSource = require('eventsource');
var Transform = require('stream').Transform;
var port = process.argv[2] || 8001;

var options = {
url: 'http://omega-1e13.local:8888/sse'
};

var source = new EventSource(options.url)
var stream = new Transform({
transform(message, encoding, next) {
var eventData = JSON.parse(message);
var sensorData = JSON.parse(eventData.data);

sensorData.sensorId = URL.parse(eventData.origin).hostname;

this.push(JSON.stringify(sensorData) + '\n');

next();
}
});

stream.on('error', function(err) {
console.log(err);
});

source.on('message', function(message) {
stream.write(JSON.stringify(message) + '\n');
});

source.on('error', function(err) {
console.log(err);
});

function handleRequest(req, res) {
if (req.url === '/temperatures.json') {
stream.pipe(res);
} else {
res.writeHead(404);
res.end();
}
}

var server = http.createServer(handleRequest)
server.listen(port, function log() {
console.log("Server running on %s", port);
});

This code creates an EventSource which is used to connect to the device SSE stream. Then we create a Transform stream which is used to parse the message payload and append the origin of the stream, so we could see which connected device is sending what. We then add an error handler that simply logs error to the console. Then a message handler is added to the eventsource, each time a message arrives we pipe it into the transform stream. We’ll also add a error handler to the eventsource. Last we configure a request handler for our server. This handler listens to incoming requests on /temperatures.json and will pipe the transform stream back into the response.

When we start this server, it will try to connect to the SSE endpoint of the Onion Omega. The device will start streaming once we connect to temperatures.json. When we connect to this endpoint with e.g curl http://localhost:8001/temperatures.json we should see incoming JSON messages every second or so.

Conclusion

We looked at a relatively simple way to collect data from sensors. When you have much more devices to read this might not be a good solution as the server needs to know about how to connect to each device.
Also anyone on the network can read the data, as there are no secure connections.
When you just want to display sensor data in real time on a local running web page this is probably good enough.

Sources