Class: Mammoth::DeliveryWorker

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/delivery_worker.rb

Overview

Delivers normalized events with retry, checkpoint, and dead-letter handling.

DeliveryWorker is Mammoth's first reliable delivery unit. It intentionally keeps the delivery contract small: attempt webhook delivery, advance the checkpoint after success, and persist the failed event to the dead letter queue after retry exhaustion.

Constant Summary collapse

DEFAULT_SOURCE =

Default source name used when an event does not provide one.

"postgresql"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:, max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep)) ⇒ DeliveryWorker

Returns a new instance of DeliveryWorker.

Parameters:

  • sink (#deliver)

    destination sink

  • checkpoint_store (Mammoth::CheckpointStore)

    checkpoint persistence

  • dead_letter_store (Mammoth::DeadLetterStore)

    dead letter persistence

  • source_name (String)

    logical source name

  • slot_name (String)

    replication slot name

  • publication_name (String)

    publication name

  • max_attempts (Integer)

    maximum delivery attempts

  • retry_schedule (Array<Integer>)

    retry wait schedule in seconds

  • sleeper (#call) (defaults to: Kernel.method(:sleep))

    sleep strategy, injectable for tests



26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/mammoth/delivery_worker.rb', line 26

def initialize(sink:, checkpoint_store:, dead_letter_store:, source_name:, slot_name:, publication_name:,
               max_attempts:, retry_schedule:, sleeper: Kernel.method(:sleep))
  @sink = sink
  @checkpoint_store = checkpoint_store
  @dead_letter_store = dead_letter_store
  @source_name = source_name
  @slot_name = slot_name
  @publication_name = publication_name
  @max_attempts = max_attempts
  @retry_schedule = retry_schedule
  @sleeper = sleeper
end

Instance Attribute Details

#checkpoint_storeObject (readonly)

Returns the value of attribute checkpoint_store.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def checkpoint_store
  @checkpoint_store
end

#dead_letter_storeObject (readonly)

Returns the value of attribute dead_letter_store.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def dead_letter_store
  @dead_letter_store
end

#max_attemptsObject (readonly)

Returns the value of attribute max_attempts.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def max_attempts
  @max_attempts
end

#publication_nameObject (readonly)

Returns the value of attribute publication_name.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def publication_name
  @publication_name
end

#retry_scheduleObject (readonly)

Returns the value of attribute retry_schedule.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def retry_schedule
  @retry_schedule
end

#sinkObject (readonly)

Returns the value of attribute sink.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def sink
  @sink
end

#sleeperObject (readonly)

Returns the value of attribute sleeper.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def sleeper
  @sleeper
end

#slot_nameObject (readonly)

Returns the value of attribute slot_name.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def slot_name
  @slot_name
end

#source_nameObject (readonly)

Returns the value of attribute source_name.



14
15
16
# File 'lib/mammoth/delivery_worker.rb', line 14

def source_name
  @source_name
end

Class Method Details

.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep)) ⇒ Mammoth::DeliveryWorker

Build a delivery worker from Mammoth configuration and stores.

Parameters:

Returns:



47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/mammoth/delivery_worker.rb', line 47

def self.from_config(config, sink:, checkpoint_store:, dead_letter_store:, sleeper: Kernel.method(:sleep))
  new(
    sink: sink,
    checkpoint_store: checkpoint_store,
    dead_letter_store: dead_letter_store,
    source_name: config.dig("mammoth", "name"),
    slot_name: config.dig("replication", "slot"),
    publication_name: Array(config.dig("replication", "publications")).join(","),
    max_attempts: config.dig("retry", "max_attempts"),
    retry_schedule: config.dig("retry", "schedule_seconds"),
    sleeper: sleeper
  )
end

Instance Method Details

#deliver(event) ⇒ Hash

Deliver an event with retry, checkpoint, and DLQ handling.

Parameters:

  • event (Hash, #to_h)

    normalized event

Returns:

  • (Hash)

    delivery summary



73
74
75
# File 'lib/mammoth/delivery_worker.rb', line 73

def deliver(event)
  deliver_work(event, serializer: EventSerializer, delivery_method: :deliver)
end

#deliver_transaction(envelope) ⇒ Hash

Deliver a transaction envelope with retry, checkpoint, and DLQ handling.

Parameters:

  • envelope (#events, #transaction_id)

    CDC transaction envelope

Returns:

  • (Hash)

    delivery summary



65
66
67
# File 'lib/mammoth/delivery_worker.rb', line 65

def deliver_transaction(envelope)
  deliver_work(envelope, serializer: TransactionEnvelopeSerializer, delivery_method: :deliver_transaction)
end