You've already forked node-redis
mirror of
https://github.com/redis/node-redis.git
synced 2025-07-31 05:44:24 +03:00
62 lines
1.5 KiB
JavaScript
62 lines
1.5 KiB
JavaScript
// A sample stream consumer using the blocking variant of XREAD.
|
|
// https://redis.io/commands/xread/
|
|
// This consumes entries from a stream created by stream-producer.js
|
|
|
|
import { createClient, commandOptions } from 'redis';
|
|
|
|
const client = createClient();
|
|
|
|
await client.connect();
|
|
|
|
let currentId = '0-0'; // Start at lowest possible stream ID
|
|
|
|
while (true) {
|
|
try {
|
|
let response = await client.xRead(
|
|
commandOptions({
|
|
isolated: true
|
|
}), [
|
|
// XREAD can read from multiple streams, starting at a
|
|
// different ID for each...
|
|
{
|
|
key: 'mystream',
|
|
id: currentId
|
|
}
|
|
], {
|
|
// Read 1 entry at a time, block for 5 seconds if there are none.
|
|
COUNT: 1,
|
|
BLOCK: 5000
|
|
}
|
|
);
|
|
|
|
if (response) {
|
|
// Response is an array of streams, each containing an array of
|
|
// entries:
|
|
// [
|
|
// {
|
|
// "name": "mystream",
|
|
// "messages": [
|
|
// {
|
|
// "id": "1642088708425-0",
|
|
// "message": {
|
|
// "num": "999"
|
|
// }
|
|
// }
|
|
// ]
|
|
// }
|
|
// ]
|
|
console.log(JSON.stringify(response));
|
|
|
|
// Get the ID of the first (only) entry returned.
|
|
currentId = response[0].messages[0].id;
|
|
console.log(currentId);
|
|
} else {
|
|
// Response is null, we have read everything that is
|
|
// in the stream right now...
|
|
console.log('No new stream entries.');
|
|
}
|
|
} catch (err) {
|
|
console.error(err);
|
|
}
|
|
}
|