You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-07-31 05:44:24 +03:00
Add stream examples. (#1830)
* Adds stream consumer and producer scripts to doc. * Updated build command example. * Adds stream producer example. * Adds basic stream consumer example. * Added isolated execution. * Update README.md * Update stream-consumer.js Co-authored-by: Leibale Eidelman <leibale1998@gmail.com>
This commit is contained in:
@ -12,6 +12,8 @@ This folder contains example scripts showing how to use Node Redis in different
|
||||
| `search-hashes.js` | Uses [RediSearch](https://redisearch.io) to index and search data in hashes |
|
||||
| `search-json.js` | Uses [RediSearch](https://redisearch.io/) and [RedisJSON](https://redisjson.io/) to index and search JSON data |
|
||||
| `set-scan.js` | An example script that shows how to use the SSCAN iterator functionality |
|
||||
| `stream-producer.js` | Adds entries to a [Redis Stream](https://redis.io/topics/streams-intro) using the `XADD` command |
|
||||
| `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command |
|
||||
|
||||
## Contributing
|
||||
|
||||
@ -24,7 +26,7 @@ To set up the examples folder so that you can run an example / develop one of yo
|
||||
```
|
||||
$ git clone https://github.com/redis/node-redis.git
|
||||
$ cd node-redis
|
||||
$ npm install -ws && npm run build
|
||||
$ npm install -ws && npm run build-all
|
||||
$ cd examples
|
||||
$ npm install
|
||||
```
|
||||
|
65
examples/stream-consumer.js
Normal file
65
examples/stream-consumer.js
Normal file
@ -0,0 +1,65 @@
|
||||
// A sample stream consumer using the blocking variant of XREAD.
|
||||
// This consumes entries from a stream created by stream-producer.js
|
||||
|
||||
import { createClient, commandOptions } from 'redis';
|
||||
|
||||
async function streamConsumer() {
|
||||
const client = createClient();
|
||||
|
||||
await client.connect();
|
||||
|
||||
let currentId = '0-0'; // Start at lowest possible stream ID
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
let response = await client.xRead(
|
||||
commandOptions({
|
||||
isolated: true
|
||||
}), [
|
||||
// XREAD can read from multiple streams, starting at a
|
||||
// different ID for each...
|
||||
{
|
||||
key: 'mystream',
|
||||
id: currentId
|
||||
}
|
||||
], {
|
||||
// Read 1 entry at a time, block for 5 seconds if there are none.
|
||||
COUNT: 1,
|
||||
BLOCK: 5000
|
||||
}
|
||||
);
|
||||
|
||||
if (response) {
|
||||
// Response is an array of streams, each containing an array of
|
||||
// entries:
|
||||
// [
|
||||
// {
|
||||
// "name": "mystream",
|
||||
// "messages": [
|
||||
// {
|
||||
// "id": "1642088708425-0",
|
||||
// "message": {
|
||||
// "num": "999"
|
||||
// }
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// ]
|
||||
console.log(JSON.stringify(response));
|
||||
|
||||
// Get the ID of the first (only) entry returned.
|
||||
currentId = response[0].messages[0].id;
|
||||
console.log(currentId);
|
||||
} else {
|
||||
// Response is null, we have read everything that is
|
||||
// in the stream right now...
|
||||
console.log('No new stream entries.');
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
}
|
||||
|
||||
streamConsumer();
|
||||
|
28
examples/stream-producer.js
Normal file
28
examples/stream-producer.js
Normal file
@ -0,0 +1,28 @@
|
||||
// A sample stream producer using XADD.
|
||||
|
||||
import { createClient } from 'redis';
|
||||
|
||||
async function streamProducer() {
|
||||
const client = createClient();
|
||||
|
||||
await client.connect();
|
||||
await client.del('mystream');
|
||||
|
||||
let num = 0;
|
||||
|
||||
while (num < 1000) {
|
||||
// * = Let Redis generate a timestamp ID for this new entry.
|
||||
let id = await client.xAdd('mystream', '*', {
|
||||
num: `${num}`
|
||||
// Other name/value pairs can go here as required...
|
||||
});
|
||||
|
||||
console.log(`Added ${id} to the stream.`);
|
||||
num += 1;
|
||||
}
|
||||
|
||||
await client.quit();
|
||||
}
|
||||
|
||||
streamProducer();
|
||||
|
Reference in New Issue
Block a user