You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-08-06 02:15:48 +03:00
Updates examples. (#2219)
* Updates examples. * Added command link for hset.
This commit is contained in:
@@ -1,4 +1,6 @@
|
||||
// A sample stream consumer using the blocking variant of XREADGROUP.
|
||||
// https://redis.io/commands/xreadgroup/
|
||||
|
||||
// This consumer works in collaboration with other instances of itself
|
||||
// in the same consumer group such that the group as a whole receives
|
||||
// every entry from the stream.
|
||||
@@ -20,85 +22,84 @@
|
||||
|
||||
import { createClient, commandOptions } from 'redis';
|
||||
|
||||
async function streamConsumerGroup() {
|
||||
const client = createClient();
|
||||
const client = createClient();
|
||||
|
||||
if (process.argv.length !== 3) {
|
||||
console.log(`usage: node stream-consumer-group.js <consumerName>`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const consumerName = process.argv[2];
|
||||
|
||||
await client.connect();
|
||||
|
||||
// Create the consumer group (and stream) if needed...
|
||||
try {
|
||||
await client.xGroupCreate('mystream', 'myconsumergroup', '0', {
|
||||
MKSTREAM: true
|
||||
});
|
||||
console.log('Created consumer group.');
|
||||
} catch (e) {
|
||||
console.log('Consumer group already exists, skipped creation.');
|
||||
}
|
||||
|
||||
console.log(`Starting consumer ${consumerName}.`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
let response = await client.xReadGroup(
|
||||
commandOptions({
|
||||
isolated: true
|
||||
}),
|
||||
'myconsumergroup',
|
||||
consumerName, [
|
||||
// XREADGROUP can read from multiple streams, starting at a
|
||||
// different ID for each...
|
||||
{
|
||||
key: 'mystream',
|
||||
id: '>' // Next entry ID that no consumer in this group has read
|
||||
}
|
||||
], {
|
||||
// Read 1 entry at a time, block for 5 seconds if there are none.
|
||||
COUNT: 1,
|
||||
BLOCK: 5000
|
||||
}
|
||||
);
|
||||
|
||||
if (response) {
|
||||
// Response is an array of streams, each containing an array of
|
||||
// entries:
|
||||
//
|
||||
// [
|
||||
// {
|
||||
// "name": "mystream",
|
||||
// "messages": [
|
||||
// {
|
||||
// "id": "1642088708425-0",
|
||||
// "message": {
|
||||
// "num": "999"
|
||||
// }
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// ]
|
||||
console.log(JSON.stringify(response));
|
||||
|
||||
// Use XACK to acknowledge successful processing of this
|
||||
// stream entry.
|
||||
const entryId = response[0].messages[0].id;
|
||||
await client.xAck('mystream', 'myconsumergroup', entryId);
|
||||
|
||||
console.log(`Acknowledged processing of entry ${entryId}.`);
|
||||
} else {
|
||||
// Response is null, we have read everything that is
|
||||
// in the stream right now...
|
||||
console.log('No new stream entries.');
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
}
|
||||
if (process.argv.length !== 3) {
|
||||
console.log(`usage: node stream-consumer-group.js <consumerName>`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
streamConsumerGroup();
|
||||
const consumerName = process.argv[2];
|
||||
|
||||
await client.connect();
|
||||
|
||||
// Create the consumer group (and stream) if needed...
|
||||
try {
|
||||
// https://redis.io/commands/xgroup-create/
|
||||
await client.xGroupCreate('mystream', 'myconsumergroup', '0', {
|
||||
MKSTREAM: true
|
||||
});
|
||||
console.log('Created consumer group.');
|
||||
} catch (e) {
|
||||
console.log('Consumer group already exists, skipped creation.');
|
||||
}
|
||||
|
||||
console.log(`Starting consumer ${consumerName}.`);
|
||||
|
||||
while (true) {
|
||||
try {
|
||||
// https://redis.io/commands/xreadgroup/
|
||||
let response = await client.xReadGroup(
|
||||
commandOptions({
|
||||
isolated: true
|
||||
}),
|
||||
'myconsumergroup',
|
||||
consumerName, [
|
||||
// XREADGROUP can read from multiple streams, starting at a
|
||||
// different ID for each...
|
||||
{
|
||||
key: 'mystream',
|
||||
id: '>' // Next entry ID that no consumer in this group has read
|
||||
}
|
||||
], {
|
||||
// Read 1 entry at a time, block for 5 seconds if there are none.
|
||||
COUNT: 1,
|
||||
BLOCK: 5000
|
||||
}
|
||||
);
|
||||
|
||||
if (response) {
|
||||
// Response is an array of streams, each containing an array of
|
||||
// entries:
|
||||
//
|
||||
// [
|
||||
// {
|
||||
// "name": "mystream",
|
||||
// "messages": [
|
||||
// {
|
||||
// "id": "1642088708425-0",
|
||||
// "message": {
|
||||
// "num": "999"
|
||||
// }
|
||||
// }
|
||||
// ]
|
||||
// }
|
||||
// ]
|
||||
console.log(JSON.stringify(response));
|
||||
|
||||
// Use XACK to acknowledge successful processing of this
|
||||
// stream entry.
|
||||
// https://redis.io/commands/xack/
|
||||
const entryId = response[0].messages[0].id;
|
||||
await client.xAck('mystream', 'myconsumergroup', entryId);
|
||||
|
||||
console.log(`Acknowledged processing of entry ${entryId}.`);
|
||||
} else {
|
||||
// Response is null, we have read everything that is
|
||||
// in the stream right now...
|
||||
console.log('No new stream entries.');
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(err);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user