Class: Mammoth::DeliveryProcessor

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/delivery_processor.rb

Overview

Adapter object used by CDC::Concurrent::ProcessorPool.

The processor keeps cdc-concurrent integration narrow: cdc-concurrent owns I/O-heavy fan-out mechanics, while DeliveryWorker owns Mammoth relay semantics such as retries, dead letters, and checkpoint writes.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(delivery_worker:, delivery_unit: :event) ⇒ DeliveryProcessor

Returns a new instance of DeliveryProcessor.

Parameters:

  • delivery_worker (Mammoth::DeliveryWorker)

    relay-aware delivery worker

  • delivery_unit (String, Symbol) (defaults to: :event)

    event or transaction



39
40
41
42
# File 'lib/mammoth/delivery_processor.rb', line 39

def initialize(delivery_worker:, delivery_unit: :event)
  @delivery_worker = delivery_worker
  @delivery_unit = delivery_unit.to_sym
end

Instance Attribute Details

#delivery_unitObject (readonly)

Returns the value of attribute delivery_unit.



35
36
37
# File 'lib/mammoth/delivery_processor.rb', line 35

def delivery_unit
  @delivery_unit
end

#delivery_workerObject (readonly)

Returns the value of attribute delivery_worker.



35
36
37
# File 'lib/mammoth/delivery_processor.rb', line 35

def delivery_worker
  @delivery_worker
end

Class Method Details

.concurrent_safeBoolean

Returns true when this processor has explicitly opted in to cdc-concurrent execution.

Returns:

  • (Boolean)

    true when this processor has explicitly opted in to cdc-concurrent execution.



30
31
32
# File 'lib/mammoth/delivery_processor.rb', line 30

def concurrent_safe?
  @concurrent_safe == true
end

.concurrent_safe!true

Mark this processor as safe for CDC::Concurrent::ProcessorPool.

DeliveryProcessor itself is intentionally stateless after initialization; per-delivery retry, dead-letter, and checkpoint behavior remains owned by the injected DeliveryWorker.

Returns:

  • (true)


20
21
22
# File 'lib/mammoth/delivery_processor.rb', line 20

def concurrent_safe!
  @concurrent_safe = true
end

.concurrent_safe?Boolean

Returns true when this processor has explicitly opted in to cdc-concurrent execution.

Returns:

  • (Boolean)

    true when this processor has explicitly opted in to cdc-concurrent execution.



26
27
28
# File 'lib/mammoth/delivery_processor.rb', line 26

def concurrent_safe?
  @concurrent_safe == true
end

Instance Method Details

#call(work) ⇒ Hash

Process one work item from CDC::Concurrent::ProcessorPool.

Parameters:

  • work (Object)

    event or transaction envelope

Returns:

  • (Hash)

    delivery summary



55
56
57
# File 'lib/mammoth/delivery_processor.rb', line 55

def call(work)
  process(work)
end

#concurrent_safe?Boolean Also known as: concurrent_safe

Returns true when this processor instance is safe for concurrent execution.

Returns:

  • (Boolean)

    true when this processor instance is safe for concurrent execution.



45
46
47
# File 'lib/mammoth/delivery_processor.rb', line 45

def concurrent_safe?
  self.class.concurrent_safe?
end

#process(work) ⇒ Object



59
60
61
62
63
64
65
66
# File 'lib/mammoth/delivery_processor.rb', line 59

def process(work)
  case delivery_unit
  when :transaction
    delivery_worker.deliver_transaction(work)
  else
    delivery_worker.deliver(work)
  end
end