diff --git a/examples/README.md b/examples/README.md index c96571241e..c3a54c1102 100644 --- a/examples/README.md +++ b/examples/README.md @@ -19,6 +19,7 @@ This folder contains example scripts showing how to use Node Redis in different | `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command | | `time-series.js` | Create, populate and query timeseries data with [Redis Timeseries](https://redistimeseries.io) | | `topk.js` | Use the [RedisBloom](https://redisbloom.io) TopK to track the most frequently seen items. | +| `stream-consumer-group.js` | Reads entties from a [Redis Stream](https://redis.io/topics/streams-intro) as part of a consumer group using the blocking `XREADGROUP` command | ## Contributing diff --git a/examples/stream-consumer-group.js b/examples/stream-consumer-group.js new file mode 100644 index 0000000000..0dc8ff20fe --- /dev/null +++ b/examples/stream-consumer-group.js @@ -0,0 +1,104 @@ +// A sample stream consumer using the blocking variant of XREADGROUP. +// This consumer works in collaboration with other instances of itself +// in the same consumer group such that the group as a whole receives +// every entry from the stream. +// +// This consumes entries from a stream created by stream-producer.js +// +// Run this as follows: +// +// $ node stream-consumer-group.js +// +// Run multiple instances with different values of +// to see them processing the stream as a group: +// +// $ node stream-consumer-group.js consumer1 +// +// In another terminal: +// +// $ node stream-consumer-group.js consumer2 + +import { createClient, commandOptions } from 'redis'; + +async function streamConsumerGroup() { + const client = createClient(); + + if (process.argv.length !== 3) { + console.log(`usage: node stream-consumer-group.js `); + process.exit(1); + } + + const consumerName = process.argv[2]; + + await client.connect(); + + // Create the consumer group (and stream) if needed... + try { + await client.xGroupCreate('mystream', 'myconsumergroup', '0', { + MKSTREAM: true + }); + console.log('Created consumer group.'); + } catch (e) { + console.log('Consumer group already exists, skipped creation.'); + } + + console.log(`Starting consumer ${consumerName}.`); + + while (true) { + try { + let response = await client.xReadGroup( + commandOptions({ + isolated: true + }), + 'myconsumergroup', + consumerName, [ + // XREADGROUP can read from multiple streams, starting at a + // different ID for each... + { + key: 'mystream', + id: '>' // Next entry ID that no consumer in this group has read + } + ], { + // Read 1 entry at a time, block for 5 seconds if there are none. + COUNT: 1, + BLOCK: 5000 + } + ); + + if (response) { + // Response is an array of streams, each containing an array of + // entries: + // + // [ + // { + // "name": "mystream", + // "messages": [ + // { + // "id": "1642088708425-0", + // "message": { + // "num": "999" + // } + // } + // ] + // } + // ] + console.log(JSON.stringify(response)); + + // Use XACK to acknowledge successful processing of this + // stream entry. + const entryId = response[0].messages[0].id; + await client.xAck('mystream', 'myconsumergroup', entryId); + + console.log(`Acknowledged processing of entry ${entryId}.`); + } else { + // Response is null, we have read everything that is + // in the stream right now... + console.log('No new stream entries.'); + } + } catch (err) { + console.error(err); + } + } +} + +streamConsumerGroup(); diff --git a/examples/stream-producer.js b/examples/stream-producer.js index 3f7a4a963c..42c5d14bb2 100644 --- a/examples/stream-producer.js +++ b/examples/stream-producer.js @@ -6,7 +6,6 @@ async function streamProducer() { const client = createClient(); await client.connect(); - await client.del('mystream'); let num = 0;