DT Developer Docs
REST APIDT StudioStatus Page
  • Getting Started
  • Overview
  • Concepts
    • Devices
    • Events
    • Topics
      • Temperature Measurement Interval
      • Motion Sensor Activity Timer
  • Data Connectors
    • Introduction to Data Connectors
    • Creating a Data Connector
    • Configuring a Data Connector
    • Receiving Events
    • Best Practices
    • Example Integrations
      • Heroku
      • Google Cloud Functions
      • AWS Lambda
      • Azure HTTP Triggers
      • IBM Cloud Actions
    • Development Guides
      • Local Development with ngrok
  • REST API
  • Introduction to REST API
  • Explore Our Endpoints
    • with cURL
    • with Python API
    • with Postman
  • Authentication
    • OAuth2
    • Basic Auth
  • Error Codes
  • Emulator API
  • Examples
    • Pagination
    • Streaming Events
    • Touch to Identify
    • Refreshing Access Token
  • Reference
  • Status Page
  • Service Accounts
    • Introduction to Service Accounts
    • Creating a Service Account
    • Managing Access Rights
    • Permissions
    • Organizational Structures
  • Other
    • Application Notes
      • Generating a Room Temperature Heatmap
      • Modeling Fridge Content Temperatures
      • Outlier Detection on Multiple Temperature Sensors
      • Simple Temperature Forecasting for Substation Transformers
      • Sensor Data Insight with Power BI and Azure
      • Third-Party Sensor Data in DT Cloud
    • Frequently Asked Question
Powered by GitBook
On this page
  • Overview
  • Preliminaries
  • Response Format
  • Stream Best Practices
  • Example Code
  • Environment Setup
  • Source
  • Expected Output

Was this helpful?

  1. Examples

Streaming Events

An example of how to use the REST API to stream sensor events in real-time.

Last updated 1 year ago

Was this helpful?

Overview

When plotting life graphs or triggering certain alarms, a continuous stream of data is often preferred over periodic polling. In this example, we will see how the REST API can be used to set up a stream with some best practices and a simple retry policy.

Preliminaries

  • Data Connectors If you want to forward your data in a server-to-server integration, consider using for a simpler and more reliable service with an added .

  • Basic Auth For simplicity, we here use Basic Auth for authentication. We recommend replacing this with an for integrations more complex than local experimentation.

  • Service Account Credentials You must create and know the credentials of a Service Account. Any role will suffice.

  • REST API This example utilizes our REST API to interact with our cloud. See the for a full list of available endpoints.

Response Format

Our REST API :stream endpoints support two types of response formats, text/event-stream and application/json. They are set using the Accept header, and defaults to application/json.

headers = { "Accept": "application/json" }  # default
headers = { "Accept": "text/event-stream" } # alternative

The different headers are used in different cases.

  • application/json: Returns a JSON object for each event, separated by line-breaks. Easy to parse in any high-level language and used in the code sample below.

  • text/event-stream: A specific format. Used by any Server-Sent Events libraries, such as EventSource.

Stream Best Practices

The following practices should be considered when implementing a stream. They are all used in the following example.

  • Implement a Retry Policy The connection to a stream can be lost at any moment due to several factors that break the connection and should be handled by implementing a retry policy. The stream will always disconnect after one hour, as that's how long the access token lasts. If you have not received a single event for the full hour, the stream will disconnect with a 408 HTTP status code.

  • Detect Stream Disconnects Early An optional ping_interval query parameter may be used to make sure the client can still receive messages from the server. If no ping messages have been received in the specified interval, the client should reconnect to the stream.

  • Filter Events By default, all event types are included in the stream. Use query parameters to reduce the number of events that need to be processed.

  • Authorization Header Use the Authorization header when possible. Some libraries—like the built-in EventSource class in browsers—does not allow setting headers. In this case, the token query parameter can be used instead.

Example Code

The following points summarize the provided example code.

  • Sends a GET request to the REST API to initialize an event stream.

  • Keep the TCP connection open while receiving events.

  • If the connection is lost or it's been too long between pings, retry N times before giving up.

Environment Setup

If you wish to run the code locally, make sure you have a working runtime environment.

The following packages are required by the example code.

pip install requests==2.31.0
pip install --upgrade disruptive

The following modules are required by the example code and must be installed.

npm install eventsource@2.0.2

The Go example uses only the standard library, so no additional modules are required.

Add environment variables for authentication details.

export DT_SERVICE_ACCOUNT_KEY_ID=<YOUR_SERVICE_ACCOUNT_KEY_ID>
export DT_SERVICE_ACCOUNT_SECRET=<YOUR_SERVICE_ACCOUNT_SECRET>
export DT_SERVICE_ACCOUNT_EMAIL=<YOUR_SERVICE_ACCOUNT_EMAIL>
export DT_PROJECT_ID=<YOUR_PROJECT_ID>

Source

The following code snippet implements streaming in a few languages.

import os
import time
import json

import requests

# Service Account credentials.
SERVICE_ACCOUNT_KEY_ID = os.environ.get('DT_SERVICE_ACCOUNT_KEY_ID')
SERVICE_ACCOUNT_SECRET = os.environ.get('DT_SERVICE_ACCOUNT_SECRET')

# Construct API URL.
PROJECT_ID = os.environ.get('DT_PROJECT_ID')
BASE_URL = 'https://api.d21s.com/v2'
DEVICES_STREAM_URL = '{}/projects/{}/devices:stream'.format(
    BASE_URL,
    PROJECT_ID
)

# A few constants to control stream behavior.
MAX_CONNECTION_RETRIES = 5
PING_INTERVAL = 10
PING_JITTER = 2


if __name__ == '__main__':
    # Set up a simple catch-all retry policy.
    nth_retry = 0
    while nth_retry <= MAX_CONNECTION_RETRIES:
        try:
            print('Streaming... Press CTRL+C to terminate.')
            # Set up a stream connection.
            # Connection will timeout and reconnect if no single event
            # is received in an interval of PING_INTERVAL + PING_JITTER.
            stream = requests.get(
                url=DEVICES_STREAM_URL,
                auth=(SERVICE_ACCOUNT_KEY_ID, SERVICE_ACCOUNT_SECRET),
                stream=True,
                timeout=PING_INTERVAL + PING_JITTER,
                params={
                    'event_types': [],
                    'ping_interval': '{}s'.format(PING_INTERVAL),
                },
            )

            # Iterate through the events as they come in (one per line).
            for line in stream.iter_lines():
                # Decode the response payload and break on error.
                payload = json.loads(line.decode('ascii'))
                if 'result' not in payload:
                    raise Exception(payload)
                event = payload['result']['event']

                # Skip ping events.
                if event['eventType'] == 'ping':
                    continue

                # Print events as they arrive.
                print(f'Got {event["eventType"]} event.')

                # Reset retry counter.
                nth_retry = 0

        except KeyboardInterrupt:
            break

        except requests.exceptions.ConnectionError:
            print('Connection lost. Reconnecting...')
            nth_retry += 1

        except Exception as e:
            print(e)

            # Print the error and try again up to MAX_CONNECTION_RETRIES.
            if nth_retry < MAX_CONNECTION_RETRIES:
                print('Something happened. Retry #{}'.format(nth_retry+1))

                # Exponential backoff in sleep time.
                time.sleep(2**nth_retry)
                nth_retry += 1
            else:
                break
import os

import disruptive as dt

PROJECT_ID = os.environ.get('DT_PROJECT_ID', '')


if __name__ == '__main__':
    # Enable logging to see what's happening under the hood.
    dt.log_level = 'debug'

    # Initialize a stream generator for all temperature events in a project.
    for e in dt.Stream.event_stream(PROJECT_ID, event_types=[]):
        # Print event information.
        print(f'Got {e.event_type} event.')
const EventSource = require("eventsource")

// Service Account credentials
const serviceAccountKeyID  = process.env.DT_SERVICE_ACCOUNT_KEY_ID
const serviceAccountSecret = process.env.DT_SERVICE_ACCOUNT_SECRET

// Construct API URL
const projectID = process.env.DT_PROJECT_ID
const apiBase = 'https://api.d21s.com/v2/'
const devicesStreamUrl = apiBase + `projects/${projectID}/devices:stream`

// Constants
const maxConnectionRetries = 5  // Max retries without any received messages
const pingInterval         = 10 // Expected interval between pings in seconds
const pingJitter           = 2  // Expected ping jitter in seconds

async function main() {
    let retryCount = 0
    let stream
    setupStream()

    // Sets up a timer that will restart the stream if there has passed too 
    // much time between ping events. This timer is reset every time we 
    // receive a ping.
    const pingTimer = setTimeout(() => {
        console.log("Too long between pings. Reconnecting...")
        clearTimeout(pingTimer)
        setTimeout(() => { // Wait a second before reconnecting
            setupStream()
        }, 1000)
    }, (pingInterval + pingJitter) * 1000)

    async function setupStream() {
        // If we've retried too many times without getting any messages, exit
        if (retryCount >= maxConnectionRetries) {
            console.log("Retried too many times. Exiting")
            process.exit(1)
        }
        retryCount += 1
        
        // Close the existing stream if we have one
        if (stream) {
            stream.close()
        }

        console.log('Streaming... Press CTRL+C to exit.')

        // Add query parameters to the URL
        let url = devicesStreamUrl
        url += `?ping_interval=${pingInterval}s` // Specifies ping interval

        // Prepare the "Authorization" header with basic auth.
        // NOTE: This should be implemented using OAuth2 in a production environment.
        const basicAuthStr = `${serviceAccountKeyID}:${serviceAccountSecret}`
        const headers = {
            Authorization: "Basic " + Buffer.from(basicAuthStr).toString("base64")
        }
        
        // Set up a new stream with callback functions for messages and errors
        // Using headers only works external "eventsource" package. In a browser
        // environment, either use polyfill or the "token" query parameter with
        // an access token from OAuth2.
        stream = new EventSource(url, { headers })
        stream.onmessage = handleStreamMessage
        stream.onerror = handleStreamError
    }
    
    function handleStreamError(err) {
        console.error("Got error from stream:")
        console.error(err)
        console.log("Reconnecting...")
        
        clearTimeout(pingTimer)
        setTimeout(() => { // Wait a second before reconnecting
            setupStream()
        }, 1000)
    }
    
    function handleStreamMessage(message) {
        // Parse the payload as JSON
        const data = JSON.parse(message.data)

        // Check if we got an error
        if (data?.error) {
            handleStreamError(data.error)
            return
        }

        // Reset the retry counter now that we've got an event
        retryCount = 0

        // Parse the event object
        const event = data.result.event
        if (event.eventType === "ping") {
            // We got a ping event. Reset the ping timer
            pingTimer.refresh()
        } else {
            // We got a temperature event
            console.log(`Got ${event.eventType} event.`)
        }
    }
}
main().catch((err) => {console.log(err)})

Note that this Go example only handles temperature events and must be expanded for other types.

package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"strings"
	"time"
)

// Events received from the stream will be in this format.
// Either `Result` or `Error` will be set, the other will be `nil`.
type StreamEvent struct {
	Result *struct {
		Event struct {
			EventId    string          `json:"eventId"`
			TargetName string          `json:"targetName"`
			EventType  string          `json:"eventType"`
			Data       json.RawMessage `json:"data"`
			Timestamp  string          `json:"timestamp"`
		} `json:"event"`
	} `json:"result"`
	Error *struct {
		Code    int    `json:"code"`
		Message string `json:"message"`
		Details []struct {
			Help string `json:"help"`
		} `json:"details"`
	} `json:"error"`
}

// A TemperatureEvent will be available in event's `Data`
// when the `EventType` is `temperature`.
type TemperatureEvent struct {
	Temperature struct {
		Celsius    float32 `json:"value"`
		UpdateTime string  `json:"updateTime"`
		Samples    []struct {
			Celsius    float32 `json:"value"`
			SampleTime string  `json:"sampleTime"`
		} `json:"samples"`
	} `json:"temperature"`
}

const (
	pingInterval = time.Second * 10 // How often we want a ping from the server
	pingLeeway   = time.Second * 2  // Allow for pings to arrive a little late
)

var (
	// This timer will time out if we have not received a ping from the
	// server within the expected interval. We'll use this as a sign that
	// we have lost connection to the server, and then reconnect.
	pingTimer = time.NewTimer(pingInterval + pingLeeway)
)

// Assumes that these environment variables are set before running the script.
var (
	serviceAccountKeyID     = os.Getenv("DT_SERVICE_ACCOUNT_KEY_ID")
	serviceAccountKeySecret = os.Getenv("DT_SERVICE_ACCOUNT_SECRET")
	projectID               = os.Getenv("DT_PROJECT_ID")
)

func startStream(eventTypes []string) {
	// Create the request
	url := fmt.Sprintf("https://api.d21s.com/v2/projects/%s/devices:stream", projectID)
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		log.Fatalf("Unable to create request: %v", err)
	}

	// Set the "Authorization" header with basic auth.
	// NOTE: OAuth2 authentication should be used in production
	req.SetBasicAuth(serviceAccountKeyID, serviceAccountKeySecret)

	// Add query parameters to the request.
	// "pingInterval" tells the server to send us a periodic pings to let us know the connection is still up.
	// "eventTypes" specifies which event types we want to receive. No specified eventTypes implies all types.
	queryParams := req.URL.Query()
	queryParams.Add("pingInterval", fmt.Sprintf("%.0fs", pingInterval.Seconds()))
	for _, eventType := range eventTypes {
		queryParams.Add("eventTypes", eventType)
	}
	req.URL.RawQuery = queryParams.Encode()

	// Connect to the stream. This will hang until we get the first event...
	fmt.Println("Connecting to stream...")
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Printf("Failed to connect to stream: %v\n", err)
		return
	}
	fmt.Printf("Connected to the stream with status code %d\n", resp.StatusCode)

	// Make sure we close the body when we're done with it
	defer resp.Body.Close()

	// Scanner to read each line in the stream
	scanner := bufio.NewScanner(resp.Body)

	// Channel used to receive raw events from the scanner
	eventChan := make(chan []byte)

	// Channel used to know when the stream has disconnected
	doneChan := make(chan struct{})

	// Scan for events.
	// Events are received as JSON blobs separated by "\n". scanner.Scan() will return
	// true when it has received a new "\n", and return false when the stream disconnects.
	// The event payload is available in scanner.Bytes().
	go func() {
		for scanner.Scan() {
			eventChan <- scanner.Bytes()
		}
		doneChan <- struct{}{}
	}()

	// Wait for either the pingTimer to go off, raw events to come in, or body to be closed.
	for {
		select {
		case <-pingTimer.C:
			fmt.Println("Missed ping. Reconnecting...")
			return
		case rawEvent := <-eventChan:
			handleRawEvent(rawEvent)
		case <-doneChan:
			fmt.Printf("Stream disconnected. Error: %v\n", scanner.Err())
			return
		}
	}
}

func handleRawEvent(rawEvent []byte) {
	// Unmarshal the stream event
	var streamEvent StreamEvent
	if err := json.Unmarshal(rawEvent, &streamEvent); err != nil {
		log.Fatalf("Unable to unmarshal stream payload %s: %v", string(rawEvent), err)
	}

	if streamEvent.Error != nil {
		fmt.Printf("Got error message: %s\n", streamEvent.Error.Message)
		return
	}

	event := streamEvent.Result.Event

	// Check first if we got a ping event
	if event.EventType == "ping" {
		// We got a ping, so we know the stream is still up
		pingTimer.Reset(pingInterval + pingLeeway)
		return
	}

	// The `targetName` field has the format: "projects/<PROJECT_ID>/devices/<DEVICE_ID>"
	deviceID := strings.Split(event.TargetName, "/")[3]

	// Figure out which event type we got
	switch event.EventType {
	case "temperature":
		// Unmarshal temp data
		var tempEvent TemperatureEvent
		if err := json.Unmarshal(event.Data, &tempEvent); err != nil {
			log.Fatalf("Unable to unmarshal temperature event %s: %v", string(event.Data), err)
		}
		handleTempEvent(tempEvent, deviceID)
	default:
		// Not yet handling this event type
		fmt.Printf("Got %s event -> %s\n", event.EventType, string(event.Data))
	}
}

func handleTempEvent(tempEvent TemperatureEvent, deviceID string) {
	timestamp, err := time.Parse(time.RFC3339, tempEvent.Temperature.UpdateTime)
	if err != nil {
		log.Fatalf("Unable to parse temperature timestamp %s: %v", tempEvent.Temperature.UpdateTime, err)
	}

	fmt.Printf("%.2f°C from %s at %s\n",
		tempEvent.Temperature.Celsius,
		deviceID,
		timestamp,
	)

	// Do more processing here...
}

func main() {
	// Always keep the stream connected. Wait 1 sec before reconnecting
	for {
		startStream([]string{"temperature"})
		time.Sleep(time.Second)
		fmt.Println("Reconnecting...")
	}
}

Expected Output

For each new event in the stream, a line will be printed to stdout.

Streaming... Press CTRL+C to terminate.
Got touch event.
Got networkStatus event.
...

The latest version of our can be installed through pip.

Python API
Data Connectors
OAuth2 flow
REST API Reference
Server-Sent Events
at-least-once guarantee