Class: Mammoth::Application

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/application.rb

Overview

Top-level Mammoth application runtime.

Application wires Mammoth's delivery-side runtime pieces: configuration, SQLite operational memory, replication consumer, delivery worker, checkpoint store, dead letter store, and webhook sink. Upstream PostgreSQL transport composition stays outside this class so the application runtime consumes an injected CDC work source rather than owning upstream CDC source-adapter lifecycle decisions.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, source: nil, sink: nil, sleeper: Kernel.method(:sleep)) ⇒ Application

Returns a new instance of Application.

Parameters:

  • config (Mammoth::Configuration)

    loaded configuration

  • source (#each, nil) (defaults to: nil)

    injectable event source for tests and demos

  • sink (#deliver, nil) (defaults to: nil)

    optional destination sink

  • sleeper (#call) (defaults to: Kernel.method(:sleep))

    retry sleep strategy



19
20
21
22
23
24
# File 'lib/mammoth/application.rb', line 19

def initialize(config, source: nil, sink: nil, sleeper: Kernel.method(:sleep))
  @config = config
  @sqlite_store = SQLiteStore.connect(config.dig("sqlite", "path")).bootstrap!
  @consumer = ReplicationConsumer.new(source: source, delivery_unit: delivery_unit)
  @delivery_worker = build_delivery_worker(sink: sink || WebhookSink.from_config(config), sleeper: sleeper)
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



13
14
15
# File 'lib/mammoth/application.rb', line 13

def config
  @config
end

#consumerObject (readonly)

Returns the value of attribute consumer.



13
14
15
# File 'lib/mammoth/application.rb', line 13

def consumer
  @consumer
end

#delivery_workerObject (readonly)

Returns the value of attribute delivery_worker.



13
14
15
# File 'lib/mammoth/application.rb', line 13

def delivery_worker
  @delivery_worker
end

#sqlite_storeObject (readonly)

Returns the value of attribute sqlite_store.



13
14
15
# File 'lib/mammoth/application.rb', line 13

def sqlite_store
  @sqlite_store
end

Instance Method Details

#startInteger

Start the application runtime and deliver consumed CDC work.

Returns:

  • (Integer)

    number of processed work units



29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/mammoth/application.rb', line 29

def start
  runtime = build_runtime
  processed = 0

  consumer.start do |work|
    process_work(runtime, work)
    processed += 1
  end

  processed
ensure
  runtime&.shutdown if runtime.respond_to?(:shutdown)
end