taoensso.carmine.message-queue

Carmine-backed Clojure message queue, v2.
All heavy lifting by Redis.

Uses an optimized message circle architecture that is simple, reliable,
and has pretty good throughput and latency.

See `mq-diagram.svg` in repo for diagram of architecture,
Ref. http://antirez.com/post/250 for initial inspiration.

Message status e/o:
  nil                  - Not in queue or already GC'd
  :queued              - Awaiting handler
  :queued-with-backoff - Awaiting handler, but skip until backoff expired
  :locked              - Currently with handler
  :locked-with-requeue - Currently with handler, will requeue when done
  :done-awaiting-gc    - Finished handling, awaiting GC
  :done-with-backoff   - Finished handling, awaiting GC,
                         but skip until dedupe backoff expired
  :done-with-requeue   - Will requeue, but skip until dedupe backoff expired

Redis keys (all prefixed with `carmine:mq:<qname>:`):
  * messages      - hash: {mid mcontent} ; Message content
  * messages-rq   - hash: {mid mcontent} ; '' for requeues
  * lock-times    - hash: {mid lock-ms}  ; Optional mid-specific lock duration
  * lock-times-rq - hash: {mid lock-ms}  ; '' for requeues
  * udts          - hash: {mid  udt-first-enqueued}
  * locks         - hash: {mid    lock-expiry-time} ; Active locks
  * backoffs      - hash: {mid backoff-expiry-time} ; Active backoffs
  * nattempts     - hash: {mid attempt-count}
  * done          - mid set: awaiting gc, etc.
  * requeue       - mid set: awaiting requeue ; Deprecated

  * mids-ready    - list: mids for immediate handling     (push to left, pop from right)
  * mid-circle    - list: mids for maintenance processing (push to left, pop from right)
  * ndry-runs     - int: num times worker(s) have lapped queue w/o work to do

  * isleep-a      - list: 0/1 sentinel element for `interruptible-sleep`
  * isleep-b      - list: 0/1 sentinel element for `interruptible-sleep`

default-throttle-ms-fn

(default-throttle-ms-fn queue-size)
Default/example (fn [queue-size]) -> ?throttle-msecs

enqueue

(enqueue qname message)(enqueue qname message {:keys [init-backoff-ms lock-ms mid can-update? can-requeue?]})
Pushes given message (any Clojure data type) to named queue and returns
a map with keys: [success? mid action error].

When `success?` is true:  `mid`, `action` will be present, with
                          `action` e/o #{:added :updated}.

When `success?` is false: `error` will be present, with
                          `error` e/o #{:already-queued :locked :backoff}.

Options:
    :init-backoff-ms - Optional initial backoff in msecs.
    :lock-ms         - Optional lock time in msecs. When unspecified, the
                       worker's default lock time will be used.

    :mid             - Optional unique message id (e.g. message hash) to
                       identify a specific message for dedupe/update/requeue.
                       When unspecified, a random uuid will be used.

    :can-update?     - When true, will update message content and/or lock-ms for
                       an mid still awaiting handling.
    :can-requeue?    - When true, will mark message with `:locked` or
                       `:done-with-backoff` status so that it will be
                       automatically requeued after garbage collection.

exp-backoff

(exp-backoff n-attempt)(exp-backoff n-attempt {:keys [min max factor], :or {factor 1000}})
Returns binary exponential backoff value for n<=36.

IWorker

protocol

Implementation detail.

members

start

(start this)

stop

(stop this)

lua-dequeue_

lua-enqueue_

lua-msg-status_

message-status

(message-status qname mid)
Returns current message status, e/o:
nil                  - Not in queue or already GC'd
:queued              - Awaiting handler
:queued-with-backoff - Awaiting handler, but skip until backoff expired
:locked              - Currently with handler
:locked-with-requeue - Currently with handler, will requeue when done
:done-awaiting-gc    - Finished handling, awaiting GC
:done-with-backoff   - Finished handling, awaiting GC,
                       but skip until dedupe backoff expired
:done-with-requeue   - Will requeue, but skip until dedupe backoff expired

monitor-fn

(monitor-fn qname max-queue-size warn-backoff-ms)
Returns a worker monitor fn that warns when queue exceeds the prescribed
size. A backoff timeout can be provided to rate-limit this warning.

queue-content

(queue-content conn-opts qname)
Returns detailed {<mid> {:keys [message status ...]}} map for every
message currently in queue.

O(n_mids) and expensive, avoid use in production.

queue-names

(queue-names conn-opts)(queue-names conn-opts pattern)
Returns a non-empty set of existing queue names, or nil.

queue-size

(queue-size conn-opts qname)
Returns in O(1) the approx number of messages awaiting handler for
given named queue. Same as (:nqueued (queue-status conn-opts qname)).

queue-status

(queue-status conn-opts qname)
Returns in O(1) the approx {:keys [nqueued nlocked nbackoff ntotal]}
counts for given named queue.

`nlocked` and `nbackoff` may include expired entries!

queues-clear!!

(queues-clear!! conn-opts qnames)
Permanently deletes ALL content for the Carmine message queues with
given names.

Returns nil, or a non-empty vector of the queue names that were cleared.

queues-clear-all!!!

(queues-clear-all!!! conn-opts)
**DANGER**!
Permanently deletes ALL content for *ALL* Carmine message queues.
Returns nil, or a non-empty vector of the queue names that were cleared.

set-min-log-level!

(set-min-log-level! level)
Sets Timbre's minimum log level for internal Carmine message queue namespaces.
Possible levels: #{:trace :debug :info :warn :error :fatal :report}.
Default level: `:info`.

worker

(worker conn-opts qname)(worker conn-opts qname {:keys [handler monitor lock-ms eoq-backoff-ms throttle-ms auto-start nthreads-worker nthreads-handler], :as worker-opts, :or {handler (fn [m] (timbre/info m) {:status :success}), monitor (monitor-fn qname 1000 (enc/ms :hours 6)), lock-ms (enc/ms :mins 60), nthreads-worker 1, nthreads-handler 1, throttle-ms :auto, eoq-backoff-ms exp-backoff, auto-start true}})
Returns a stateful threaded CarmineMessageQueueWorker to handle messages
added to named queue with `enqueue`.

API:
  - (deref <worker>)          => Status map, {:keys [running? nthreads stats ...]}.
  - (<worker> :start)         => Same as calling (start <worker>).
  - (<worker> :stop)          => Same as calling (stop  <worker>).
  - (<worker> :queue-size)    => Same as calling `queue-size`    for given qname.
  - (<worker> :queue-status)  => Same as calling `queue-status`  for given qname.
  - (<worker> :queue-content) => Same as calling `queue-content` for given qname.

Options:
  :handler          - (fn [{:keys [qname mid message attempt]}]) that throws
                      or returns {:status     <#{:success :error :retry}>
                                  :throwable  <Throwable>
                                  :backoff-ms <retry-or-dedupe-backoff-ms}.
  :monitor          - (fn [{:keys [queue-size ndry-runs poll-reply]}])
                      called on each worker loop iteration. Useful for queue
                      monitoring/logging. See also `monitor-fn`.
  :lock-ms          - Default time that handler may keep a message before handler
                      considered fatally stalled and message is re-queued. Must be
                      sufficiently high to prevent double handling. Can be
                      overridden on a per-message basis via `enqueue`.

  :throttle-ms      - Thread sleep period between each poll.
                      Can be a (fn [queue-size]) -> ?sleep-msecs,
                      or :auto (to use `default-throttle-ms-fn`).

  :eoq-backoff-ms   - Max msecs to sleep thread each time end of queue is reached.
                      Can be a (fn [ndry-runs]) -> msecs for n<=5.
                      Sleep may be interrupted when new messages are enqueued.
                      If present, connection read timeout should be >= max msecs.

  :nthreads-worker  - Number of threads to monitor and maintain queue.
  :nthreads-handler - Number of threads to handle queue messages with handler fn.

worker?

(worker? x)