You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-07 13:22:56 +03:00
Add streams XREADGROUP and XACK example. (#1832)
* Removed stream delete command to allow consumer group example to work. * Adds stream consumer group example. * Adds stream consumer group example code. * Update README.md Co-authored-by: Leibale Eidelman <leibale1998@gmail.com>
This commit is contained in:
@@ -19,6 +19,7 @@ This folder contains example scripts showing how to use Node Redis in different
|
|||||||
| `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command |
|
| `stream-consumer.js` | Reads entries from a [Redis Stream](https://redis.io/topics/streams-intro) using the blocking `XREAD` command |
|
||||||
| `time-series.js` | Create, populate and query timeseries data with [Redis Timeseries](https://redistimeseries.io) |
|
| `time-series.js` | Create, populate and query timeseries data with [Redis Timeseries](https://redistimeseries.io) |
|
||||||
| `topk.js` | Use the [RedisBloom](https://redisbloom.io) TopK to track the most frequently seen items. |
|
| `topk.js` | Use the [RedisBloom](https://redisbloom.io) TopK to track the most frequently seen items. |
|
||||||
|
| `stream-consumer-group.js` | Reads entties from a [Redis Stream](https://redis.io/topics/streams-intro) as part of a consumer group using the blocking `XREADGROUP` command |
|
||||||
|
|
||||||
## Contributing
|
## Contributing
|
||||||
|
|
||||||
|
104
examples/stream-consumer-group.js
Normal file
104
examples/stream-consumer-group.js
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
// A sample stream consumer using the blocking variant of XREADGROUP.
|
||||||
|
// This consumer works in collaboration with other instances of itself
|
||||||
|
// in the same consumer group such that the group as a whole receives
|
||||||
|
// every entry from the stream.
|
||||||
|
//
|
||||||
|
// This consumes entries from a stream created by stream-producer.js
|
||||||
|
//
|
||||||
|
// Run this as follows:
|
||||||
|
//
|
||||||
|
// $ node stream-consumer-group.js <consumerName>
|
||||||
|
//
|
||||||
|
// Run multiple instances with different values of <consumerName>
|
||||||
|
// to see them processing the stream as a group:
|
||||||
|
//
|
||||||
|
// $ node stream-consumer-group.js consumer1
|
||||||
|
//
|
||||||
|
// In another terminal:
|
||||||
|
//
|
||||||
|
// $ node stream-consumer-group.js consumer2
|
||||||
|
|
||||||
|
import { createClient, commandOptions } from 'redis';
|
||||||
|
|
||||||
|
async function streamConsumerGroup() {
|
||||||
|
const client = createClient();
|
||||||
|
|
||||||
|
if (process.argv.length !== 3) {
|
||||||
|
console.log(`usage: node stream-consumer-group.js <consumerName>`);
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
const consumerName = process.argv[2];
|
||||||
|
|
||||||
|
await client.connect();
|
||||||
|
|
||||||
|
// Create the consumer group (and stream) if needed...
|
||||||
|
try {
|
||||||
|
await client.xGroupCreate('mystream', 'myconsumergroup', '0', {
|
||||||
|
MKSTREAM: true
|
||||||
|
});
|
||||||
|
console.log('Created consumer group.');
|
||||||
|
} catch (e) {
|
||||||
|
console.log('Consumer group already exists, skipped creation.');
|
||||||
|
}
|
||||||
|
|
||||||
|
console.log(`Starting consumer ${consumerName}.`);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
let response = await client.xReadGroup(
|
||||||
|
commandOptions({
|
||||||
|
isolated: true
|
||||||
|
}),
|
||||||
|
'myconsumergroup',
|
||||||
|
consumerName, [
|
||||||
|
// XREADGROUP can read from multiple streams, starting at a
|
||||||
|
// different ID for each...
|
||||||
|
{
|
||||||
|
key: 'mystream',
|
||||||
|
id: '>' // Next entry ID that no consumer in this group has read
|
||||||
|
}
|
||||||
|
], {
|
||||||
|
// Read 1 entry at a time, block for 5 seconds if there are none.
|
||||||
|
COUNT: 1,
|
||||||
|
BLOCK: 5000
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
if (response) {
|
||||||
|
// Response is an array of streams, each containing an array of
|
||||||
|
// entries:
|
||||||
|
//
|
||||||
|
// [
|
||||||
|
// {
|
||||||
|
// "name": "mystream",
|
||||||
|
// "messages": [
|
||||||
|
// {
|
||||||
|
// "id": "1642088708425-0",
|
||||||
|
// "message": {
|
||||||
|
// "num": "999"
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
// ]
|
||||||
|
// }
|
||||||
|
// ]
|
||||||
|
console.log(JSON.stringify(response));
|
||||||
|
|
||||||
|
// Use XACK to acknowledge successful processing of this
|
||||||
|
// stream entry.
|
||||||
|
const entryId = response[0].messages[0].id;
|
||||||
|
await client.xAck('mystream', 'myconsumergroup', entryId);
|
||||||
|
|
||||||
|
console.log(`Acknowledged processing of entry ${entryId}.`);
|
||||||
|
} else {
|
||||||
|
// Response is null, we have read everything that is
|
||||||
|
// in the stream right now...
|
||||||
|
console.log('No new stream entries.');
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
console.error(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamConsumerGroup();
|
@@ -6,7 +6,6 @@ async function streamProducer() {
|
|||||||
const client = createClient();
|
const client = createClient();
|
||||||
|
|
||||||
await client.connect();
|
await client.connect();
|
||||||
await client.del('mystream');
|
|
||||||
|
|
||||||
let num = 0;
|
let num = 0;
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user