From 309cdbdd7ce2a2ee833bb48c86604fae968c093d Mon Sep 17 00:00:00 2001 From: Simon Prickett Date: Thu, 13 Jan 2022 16:57:03 +0000 Subject: [PATCH] Add stream examples. (#1830) * Adds stream consumer and producer scripts to doc. * Updated build command example. * Adds stream producer example. * Adds basic stream consumer example. * Added isolated execution. * Update README.md * Update stream-consumer.js Co-authored-by: Leibale Eidelman --- examples/README.md | 4 ++- examples/stream-consumer.js | 65 +++++++++++++++++++++++++++++++++++++ examples/stream-producer.js | 28 ++++++++++++++++ 3 files changed, 96 insertions(+), 1 deletion(-) create mode 100644 examples/stream-consumer.js create mode 100644 examples/stream-producer.js diff --git a/examples/README.md b/examples/README.md index 94b043ae48..a9c0869901 100644 --- a/examples/README.md +++ b/examples/README.md @@ -12,6 +12,8 @@ This folder contains example scripts showing how to use Node Redis in different | `search-hashes.js` | Uses [RediSearch](https://redisearch.io) to index and search data in hashes | | `search-json.js` | Uses [RediSearch](https://redisearch.io/) and [RedisJSON](https://redisjson.io/) to index and search JSON data | | `set-scan.js` | An example script that shows how to use the SSCAN iterator functionality | +| `stream-producer.js` | Adds entries to a [Redis Stream](https://redis.io/topics/streams-intro) using the `XADD` command | +| `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command | ## Contributing @@ -24,7 +26,7 @@ To set up the examples folder so that you can run an example / develop one of yo ``` $ git clone https://github.com/redis/node-redis.git $ cd node-redis -$ npm install -ws && npm run build +$ npm install -ws && npm run build-all $ cd examples $ npm install ``` diff --git a/examples/stream-consumer.js b/examples/stream-consumer.js new file mode 100644 index 0000000000..4e0f140c0d --- /dev/null +++ b/examples/stream-consumer.js @@ -0,0 +1,65 @@ +// A sample stream consumer using the blocking variant of XREAD. +// This consumes entries from a stream created by stream-producer.js + +import { createClient, commandOptions } from 'redis'; + +async function streamConsumer() { + const client = createClient(); + + await client.connect(); + + let currentId = '0-0'; // Start at lowest possible stream ID + + while (true) { + try { + let response = await client.xRead( + commandOptions({ + isolated: true + }), [ + // XREAD can read from multiple streams, starting at a + // different ID for each... + { + key: 'mystream', + id: currentId + } + ], { + // Read 1 entry at a time, block for 5 seconds if there are none. + COUNT: 1, + BLOCK: 5000 + } + ); + + if (response) { + // Response is an array of streams, each containing an array of + // entries: + // [ + // { + // "name": "mystream", + // "messages": [ + // { + // "id": "1642088708425-0", + // "message": { + // "num": "999" + // } + // } + // ] + // } + // ] + console.log(JSON.stringify(response)); + + // Get the ID of the first (only) entry returned. + currentId = response[0].messages[0].id; + console.log(currentId); + } else { + // Response is null, we have read everything that is + // in the stream right now... + console.log('No new stream entries.'); + } + } + } catch (err) { + console.error(err); + } +} + +streamConsumer(); + diff --git a/examples/stream-producer.js b/examples/stream-producer.js new file mode 100644 index 0000000000..3f7a4a963c --- /dev/null +++ b/examples/stream-producer.js @@ -0,0 +1,28 @@ +// A sample stream producer using XADD. + +import { createClient } from 'redis'; + +async function streamProducer() { + const client = createClient(); + + await client.connect(); + await client.del('mystream'); + + let num = 0; + + while (num < 1000) { + // * = Let Redis generate a timestamp ID for this new entry. + let id = await client.xAdd('mystream', '*', { + num: `${num}` + // Other name/value pairs can go here as required... + }); + + console.log(`Added ${id} to the stream.`); + num += 1; + } + + await client.quit(); +} + +streamProducer(); +