Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
1e0ccf7
ci: compile main sources in coverage_report job
sonus21 May 2, 2026
65dfd14
Merge branch 'master' of github.com:/sonus21/rqueue
sonus21 May 3, 2026
5186137
nats-web: implement pause / soft-delete admin ops and capability-awar…
sonus21 May 5, 2026
42eb61e
nats-web: capability-aware nav / charts and stream-based peek
sonus21 May 5, 2026
2c15390
nats-web: backend-aware data-type labels and Limits-aware queue size
sonus21 May 5, 2026
81b4470
nats-web: position-based pending estimate for Limits streams, render …
sonus21 May 5, 2026
0d040b1
nats-web: per-consumer pending breakdown for Limits-retention streams
sonus21 May 5, 2026
c2dbfc2
nats-web: consumer-level Subscribers + Terminal Storage redesign
sonus21 May 5, 2026
60f6f33
nats-web: queue-detail redesign — hero, chip strip, subscriber & term…
sonus21 May 5, 2026
a6a9e69
fix: reuse single consumer for workqueue streams
sonus21 May 5, 2026
a7a3054
Revert "fix: reuse single consumer for workqueue streams"
sonus21 May 5, 2026
4fdd11b
nats-web: tighten queue-detail layout, add play/pause action button
sonus21 May 5, 2026
21ecfb5
fix: use single consumer-name suffix in resolvedConsumerName
sonus21 May 5, 2026
7bcc30d
nats-web: consumer-aware peek for Limits-retention streams
sonus21 May 5, 2026
0a4eb5c
nats-web: peek from ackFloor, not delivered.streamSeq
sonus21 May 5, 2026
524fc5c
fix: ack/nack target wrong NATS Message under multi-consumer fan-out
sonus21 May 5, 2026
f83a821
nats: regression test for inFlight key-collision under fan-out
sonus21 May 5, 2026
ec99465
nats-web: align Pending column with explorer + add Workers column
sonus21 May 5, 2026
deb1040
nats: tests for consumer-aware peek + adapt QueueModeIT to new contract
sonus21 May 5, 2026
1fc9693
nats-web: keep Pending column as numPending (yet-to-deliver only)
sonus21 May 5, 2026
e523b6d
Merge branch 'master' of github.com:/sonus21/rqueue into nats-v2-web
sonus21 May 6, 2026
4c777f1
core: expose message-converter exception to middleware via Job
sonus21 May 7, 2026
4e20b75
web: fix tests for consumer-aware getExplorePageData + peek signatures
sonus21 May 7, 2026
de928e0
build: bump version to 4.0.0-RC6 and publish nats/redis/web modules
sonus21 May 7, 2026
dc99997
docs: changelog for 4.0.0 (date TBD)
sonus21 May 7, 2026
28194a9
docs: backfill RC3/RC4 changelog entries and trim 4.0.0 scope
sonus21 May 7, 2026
f6ee247
docs: split 4.0.0 entry into RC5 (web fixes + multi-consumer fix) and…
sonus21 May 7, 2026
11593ad
docs: correct RC3 changelog — Java 17 baseline (lowered from 21)
sonus21 May 7, 2026
ecd9070
Apply Palantir Java Format
github-actions[bot] May 7, 2026
c1bb3da
nats: fix stale deleteMessage IT to assert tombstone behaviour
sonus21 May 7, 2026
faa9926
web: replace Pebble templates with plain Java HTML renderer
sonus21 May 7, 2026
74a28ff
nats-web: hide Explore Data and Move Messages on utility page
sonus21 May 7, 2026
d4fb731
web: add @JsonProperty to request fields with camelCase names
sonus21 May 8, 2026
b85937b
web: guard getExplorePageData against null QueueConfig
sonus21 May 8, 2026
f865fd2
build: pin lettuce-core to explicit version to fix Netty 4.1/4.2 conf…
sonus21 May 8, 2026
6d51b24
build: bump version to 4.0.0-RC7 and update CHANGELOG
sonus21 May 8, 2026
b71a7c9
Merge branch 'nats-v2-web' of github.com:/sonus21/rqueue into nats-v2…
sonus21 May 8, 2026
e99dec9
Apply Palantir Java Format
github-actions[bot] May 8, 2026
77039e9
nats: remove duplicateWindow, implement moveMessage/enqueueMessage, a…
sonus21 May 9, 2026
4452afe
build: bump version to 4.0.0-RC8 and update CHANGELOG
sonus21 May 9, 2026
a557db4
merge: resolve conflict in JetStreamMessageBroker — keep ADR-51 heade…
sonus21 May 9, 2026
6b4ad83
Apply Palantir Java Format
github-actions[bot] May 9, 2026
c01bdf5
docs: overhaul NATS configuration page for RC8
sonus21 May 9, 2026
5517bec
docs+fix: update RC7 changelog and fix RqueueViewControllerTest compi…
sonus21 May 9, 2026
0f3f5fa
Merge branch 'nats-v2-web' of github.com:/sonus21/rqueue into nats-v2…
sonus21 May 9, 2026
3bc5f8b
fix: null-guard serverInfo in NatsProvisioner to fix unit tests
sonus21 May 9, 2026
126497d
Apply Palantir Java Format
github-actions[bot] May 9, 2026
b936806
test: skip scheduled-message E2E tests when NATS server < 2.12
sonus21 May 9, 2026
bc3a397
test: add unit tests for NATS KV, lock, metrics, worker-registry, and…
sonus21 May 9, 2026
1b8a279
test: add 100 unit tests across NATS DAO, repository, and metadata-se…
sonus21 May 9, 2026
1182de4
test: add coverage-boosting tests for rqueue-spring, rqueue-web, rque…
sonus21 May 9, 2026
68f22fd
Apply Palantir Java Format
github-actions[bot] May 9, 2026
d882431
fixed test
sonus21 May 9, 2026
ace5e3c
Merge branch 'nats-v2-web' of github.com:/sonus21/rqueue into nats-v2…
sonus21 May 9, 2026
0246526
fix(nats): correct delayed-queue scheduling implementation (ADR-51)
sonus21 May 9, 2026
626e3fd
chore: resolve merge conflicts — keep nats-scheduling-fix changes
sonus21 May 9, 2026
9b2e666
Apply Palantir Java Format
github-actions[bot] May 9, 2026
0dd63ff
fix(nats): merge sched-wildcard subjects when upgrading stream to sch…
sonus21 May 9, 2026
2aea3b7
test(nats): add IT for enqueue-first-then-enqueueWithDelay ordering bug
sonus21 May 9, 2026
e823a2d
test(nats): add Spring Boot E2E test for enqueue-then-enqueueIn strea…
sonus21 May 9, 2026
bf71e7e
ci: log broker info before tests and upload server logs as artifacts
sonus21 May 9, 2026
40e4e24
Apply Palantir Java Format
github-actions[bot] May 9, 2026
17af14d
test(nats): isolate each Spring Boot E2E class with a per-class strea…
sonus21 May 9, 2026
20a0ccf
feat(nats): wire NATS-native dead-letter advisory bridge per queue at…
sonus21 May 9, 2026
6ead9c2
Merge branch 'nats-scheduling-fix' of github.com:/sonus21/rqueue into…
sonus21 May 9, 2026
3c9fedc
Apply Palantir Java Format
github-actions[bot] May 9, 2026
e2ff476
test(nats): tighten concurrentRetryOnRecurringMessageNoDuplicates to …
sonus21 May 9, 2026
e1600ba
fix(metrics): disambiguate gauges and counters per consumer on multi-…
sonus21 May 10, 2026
151ec0f
Merge branch 'nats-scheduling-fix' of github.com:/sonus21/rqueue into…
sonus21 May 10, 2026
64b6860
Apply Palantir Java Format
github-actions[bot] May 10, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 71 additions & 9 deletions .github/workflows/java-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,25 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server
redis-cli --version
sudo systemctl stop redis-server
redis-server --logfile /tmp/redis.log --daemonize yes

- name: Log Redis info
run: |
redis-server --version
redis-cli INFO server

- name: Run producer-only tests
run: ./gradlew test -DincludeTags=producerOnly

- name: Upload redis-server log
if: always()
uses: actions/upload-artifact@v4
with:
name: producer-redis-server
path: /tmp/redis.log
if-no-files-found: ignore

- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -189,10 +204,25 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server
redis-cli --version
sudo systemctl stop redis-server
redis-server --logfile /tmp/redis.log --daemonize yes

- name: Log Redis info
run: |
redis-server --version
redis-cli INFO server

- name: Run integration tests
run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local

- name: Upload redis-server log
if: always()
uses: actions/upload-artifact@v4
with:
name: integration-redis-server
path: /tmp/redis.log
if-no-files-found: ignore

- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -245,12 +275,12 @@ jobs:
- name: Setup Redis Cluster
run: |
mkdir 9000 9001 9002 9003 9004 9005
printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9000/redis.conf
printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9001/redis.conf
printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9002/redis.conf
printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9003/redis.conf
printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9004/redis.conf
printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes" >> 9005/redis.conf
printf "port 9000 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9000.log" >> 9000/redis.conf
printf "port 9001 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9001.log" >> 9001/redis.conf
printf "port 9002 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9002.log" >> 9002/redis.conf
printf "port 9003 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9003.log" >> 9003/redis.conf
printf "port 9004 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9004.log" >> 9004/redis.conf
printf "port 9005 \ncluster-enabled yes \ncluster-config-file nodes.conf \ncluster-node-timeout 5000 \nappendonly yes\nlogfile /tmp/redis-9005.log" >> 9005/redis.conf
(cd 9000 && redis-server ./redis.conf) &
(cd 9001 && redis-server ./redis.conf) &
(cd 9002 && redis-server ./redis.conf) &
Expand All @@ -260,9 +290,23 @@ jobs:
sleep 30
yes yes | redis-cli --cluster create 127.0.0.1:9000 127.0.0.1:9001 127.0.0.1:9002 127.0.0.1:9003 127.0.0.1:9004 127.0.0.1:9005 --cluster-replicas 1

- name: Log Redis cluster info
run: |
redis-server --version
redis-cli -p 9000 INFO server | grep -E 'redis_version|os|tcp_port|uptime_in_seconds'
redis-cli -p 9000 CLUSTER INFO

- name: Run Redis cluster tests
run: ./gradlew test -DincludeTags=redisCluster

- name: Upload redis-server logs
if: always()
uses: actions/upload-artifact@v4
with:
name: redis-server-cluster
path: /tmp/redis-*.log
if-no-files-found: ignore

- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -313,10 +357,25 @@ jobs:
sudo apt-get update
sudo apt-get install -y redis-server
redis-cli --version
sudo systemctl stop redis-server
redis-server --logfile /tmp/redis.log --daemonize yes

- name: Log Redis info
run: |
redis-server --version
redis-cli INFO server

- name: Run reactive integration tests
run: ./gradlew test -DincludeTags=integration -DexcludeTags=redisCluster,producerOnly,local

- name: Upload redis-server log
if: always()
uses: actions/upload-artifact@v4
with:
name: reactive-redis-server
path: /tmp/redis.log
if-no-files-found: ignore

- name: Upload JaCoCo exec data
if: always()
uses: actions/upload-artifact@v4
Expand Down Expand Up @@ -365,7 +424,7 @@ jobs:
# instead of pulling a Testcontainers image.
- name: Install nats-server
run: |
NATS_VERSION=v2.10.22
NATS_VERSION=v2.12.0
curl -sSL "https://github.com/nats-io/nats-server/releases/download/${NATS_VERSION}/nats-server-${NATS_VERSION}-linux-amd64.tar.gz" \
| tar -xz -C /tmp
sudo mv "/tmp/nats-server-${NATS_VERSION}-linux-amd64/nats-server" /usr/local/bin/nats-server
Expand All @@ -374,14 +433,17 @@ jobs:
- name: Start nats-server
run: |
mkdir -p /tmp/jetstream
nohup nats-server -js -sd /tmp/jetstream -p 4222 > /tmp/nats.log 2>&1 &
nats-server -js -sd /tmp/jetstream -p 4222 > /tmp/nats.log 2>&1 &
for i in $(seq 1 20); do
if (echo > /dev/tcp/127.0.0.1/4222) 2>/dev/null; then
echo "nats-server ready after ${i}s"; break
fi
sleep 1
done

- name: Log NATS server info
run: nats-server --version

- name: Run NATS tests
env:
NATS_RUNNING: "true"
Expand All @@ -392,7 +454,7 @@ jobs:
if: always()
uses: actions/upload-artifact@v4
with:
name: nats-server-log
name: nats-server
path: /tmp/nats.log
if-no-files-found: ignore

Expand Down
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Never remove or increment the base version number. The human decides when the of
<claude-mem-context>
# Memory Context

# [rqueue] recent context, 2026-05-09 2:28pm GMT+5:30
# [rqueue] recent context, 2026-05-09 9:08pm GMT+5:30

No previous sessions found.
</claude-mem-context>
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,15 @@ private void updateCounter(boolean fail) {
if (Objects.isNull(counter)) {
return;
}
// Pass the consumer name so the counter can route increments to the per-consumer entry
// when multiple @RqueueListener methods share a queue with distinct consumerName overrides;
// null/empty (no override) routes to the bare-queue counter unchanged.
String queueName = job.getQueueDetail().getName();
String consumerName = job.getQueueDetail().getConsumerName();
if (fail) {
counter.updateFailureCount(job.getQueueDetail().getName());
counter.updateFailureCount(queueName, consumerName);
} else {
counter.updateExecutionCount(job.getQueueDetail().getName());
counter.updateExecutionCount(queueName, consumerName);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@
/**
* Queue counter counts the different types of events related to a queue. Failure and execution
* count, it supports queue registrations.
*
* <p><b>Multi-consumer keying.</b> Two {@code @RqueueListener} methods on the same queue with
* different {@code consumerName} overrides each produce a distinct {@link QueueDetail} and need
* their own counters. The maps are keyed by {@code queueName##consumerName} (or just
* {@code queueName} when no override is set) so the second registration does not silently
* overwrite the first. Increment lookups follow the same composite key — see
* {@link #updateFailureCount(String, String)}.
*/
public class QueueCounter {

Expand All @@ -37,40 +44,70 @@ public class QueueCounter {
private final Map<String, Counter> queueNameToFailureCounter = new HashMap<>();
private final Map<String, Counter> queueNameToExecutionCounter = new HashMap<>();

private void updateCounter(Map<String, Counter> map, String queueName) {
Counter counter = map.get(queueName);
private static String key(String queueName, String consumerName) {
return (consumerName == null || consumerName.isEmpty())
? queueName
: queueName + "##" + consumerName;
}

private void updateCounter(Map<String, Counter> map, String queueName, String consumerName) {
// Try the consumer-specific entry first; fall back to the bare queue-name entry so callers
// that don't yet pass a consumer name (older paths, single-consumer queues) still work.
Counter counter = map.get(key(queueName, consumerName));
if (counter == null && consumerName != null && !consumerName.isEmpty()) {
counter = map.get(queueName);
}
if (counter == null) {
return;
}
counter.increment();
}

/** Backward-compatible single-arg increment; route to the bare-queue counter only. */
void updateFailureCount(String queueName) {
updateCounter(queueNameToFailureCounter, queueName);
updateCounter(queueNameToFailureCounter, queueName, null);
}

/** Backward-compatible single-arg increment; route to the bare-queue counter only. */
void updateExecutionCount(String queueName) {
updateCounter(queueNameToExecutionCounter, queueName);
updateCounter(queueNameToExecutionCounter, queueName, null);
}

/**
* Consumer-aware increment: increments the counter registered for
* {@code (queueName, consumerName)}, falling back to the bare {@code queueName} counter when no
* consumer-scoped entry exists. Use this from
* {@link com.github.sonus21.rqueue.listener.RqueueExecutor} (which has the {@link QueueDetail})
* so multi-consumer queues keep accurate per-consumer counts.
*/
void updateFailureCount(String queueName, String consumerName) {
updateCounter(queueNameToFailureCounter, queueName, consumerName);
}

/** Consumer-aware execution-count increment. See {@link #updateFailureCount(String, String)}. */
void updateExecutionCount(String queueName, String consumerName) {
updateCounter(queueNameToExecutionCounter, queueName, consumerName);
}

void registerQueue(
MetricsProperties metricsProperties,
Tags queueTags,
MeterRegistry registry,
QueueDetail queueDetail) {
String mapKey = key(queueDetail.getName(), queueDetail.getConsumerName());
if (metricsProperties.countFailure()) {
Counter.Builder builder = Counter.builder(metricsProperties.getMetricName(FAILURE_COUNT))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("Failure count");
Counter counter = builder.register(registry);
queueNameToFailureCounter.put(queueDetail.getName(), counter);
queueNameToFailureCounter.put(mapKey, counter);
}
if (metricsProperties.countExecution()) {
Counter.Builder builder = Counter.builder(metricsProperties.getMetricName(EXECUTION_COUNT))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("Task execution count");
Counter counter = builder.register(registry);
queueNameToExecutionCounter.put(queueDetail.getName(), counter);
queueNameToExecutionCounter.put(mapKey, counter);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,14 @@ public void updateFailureCount(String queueName) {
public void updateExecutionCount(String queueName) {
queueCounter.updateExecutionCount(queueName);
}

@Override
public void updateFailureCount(String queueName, String consumerName) {
queueCounter.updateFailureCount(queueName, consumerName);
}

@Override
public void updateExecutionCount(String queueName, String consumerName) {
queueCounter.updateExecutionCount(queueName, consumerName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
public class RqueueMetrics implements RqueueMetricsRegistry {

static final String QUEUE_KEY = "key";
/**
* Tag added when a {@link QueueDetail} declares a {@code consumerName} override. Without this
* tag, two {@code @RqueueListener} methods on the same queue with different consumer names
* register gauges with identical (name, tag-set) pairs and Micrometer silently keeps only the
* first — losing the second consumer's metrics entirely.
*/
static final String CONSUMER_KEY = "consumer";

private static final String QUEUE_SIZE = "queue.size";
private static final String SCHEDULED_QUEUE_SIZE = "scheduled.queue.size";
private static final String PROCESSING_QUEUE_SIZE = "processing.queue.size";
Expand All @@ -58,17 +66,31 @@ private void monitor() {
for (QueueDetail queueDetail : EndpointRegistry.getActiveQueueDetails()) {
Tags queueTags =
Tags.concat(metricsProperties.getMetricTags(), "queue", queueDetail.getName());
// When a queue carries multiple consumers (multiple @RqueueListener with distinct
// consumerName overrides), each gets its own QueueDetail. Without a `consumer` tag the
// gauges would share the same (name, tags) and Micrometer would drop all but the first.
String consumerName = queueDetail.getConsumerName();
boolean hasConsumerOverride = consumerName != null && !consumerName.isEmpty();
if (hasConsumerOverride) {
queueTags = queueTags.and(CONSUMER_KEY, consumerName);
}
Gauge.builder(
metricsProperties.getMetricName(QUEUE_SIZE),
queueDetail,
c -> queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
c -> hasConsumerOverride
? queueMetricsProvider.getPendingMessageCountByConsumer(
queueDetail.getName(), consumerName)
: queueMetricsProvider.getPendingMessageCount(queueDetail.getName()))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getQueueName()))
.description("The number of entries in this queue")
.register(meterRegistry);
Gauge.builder(
metricsProperties.getMetricName(PROCESSING_QUEUE_SIZE),
queueDetail,
c -> queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
c -> hasConsumerOverride
? queueMetricsProvider.getProcessingMessageCountByConsumer(
queueDetail.getName(), consumerName)
: queueMetricsProvider.getProcessingMessageCount(queueDetail.getName()))
.tags(queueTags.and(QUEUE_KEY, queueDetail.getProcessingQueueName()))
.description("The number of entries in the processing queue")
.register(meterRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,21 @@ public interface RqueueMetricsCounter {
void updateFailureCount(String queueName);

void updateExecutionCount(String queueName);

/**
* Consumer-aware failure increment. When a queue carries multiple {@code @RqueueListener}
* methods with distinct {@code consumerName} overrides, each consumer has its own counter
* registered under {@code (queueName, consumerName)}; calling the bare-queue overload would
* route every increment to the same (last-registered) counter and silently lose per-consumer
* counts. Defaults to the queue-level path so callers that don't have a consumer name keep
* working unchanged.
*/
default void updateFailureCount(String queueName, String consumerName) {
updateFailureCount(queueName);
}

/** Consumer-aware execution increment. See {@link #updateFailureCount(String, String)}. */
default void updateExecutionCount(String queueName, String consumerName) {
updateExecutionCount(queueName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,25 @@ default long getScheduledMessageCount(String queueName, String priority) {
default long getProcessingMessageCount(String queueName, String priority) {
return getProcessingMessageCount(queueName);
}

/**
* Per-consumer variant of {@link #getPendingMessageCount(String)}. When two
* {@code @RqueueListener} methods on the same queue declare different {@code consumerName}
* overrides, each gets its own QueueDetail and its own metric registration; backends that can
* report per-consumer pending depth (e.g. NATS JetStream Limits/Interest streams or any
* fan-out broker) should override this. The default delegates to the queue-level call so
* single-consumer queues, and backends without per-consumer state, behave unchanged.
*
* @param queueName user-facing queue name
* @param consumerName consumer-name override from {@code @RqueueListener(consumerName=...)};
* {@code null} or empty when no override is set
*/
default long getPendingMessageCountByConsumer(String queueName, String consumerName) {
return getPendingMessageCount(queueName);
}

/** Per-consumer variant of {@link #getProcessingMessageCount(String)}. See related javadoc. */
default long getProcessingMessageCountByConsumer(String queueName, String consumerName) {
return getProcessingMessageCount(queueName);
}
}
Loading