Class: Mammoth::ConcurrentDeliveryRuntime

Inherits:
Object
  • Object
show all
Defined in:
lib/mammoth/concurrent_delivery_runtime.rb

Overview

Delivery runtime backed by CDC::Concurrent::ProcessorPool.

Mammoth keeps a single upstream replication stream and delegates downstream I/O fan-out to cdc-concurrent. This class is intentionally small so the runtime boundary remains easy to test and replace.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(processor:, concurrency:, timeout:, preserve_order:) ⇒ ConcurrentDeliveryRuntime

Returns a new instance of ConcurrentDeliveryRuntime.

Parameters:

  • processor (#process)

    delivery processor

  • concurrency (Integer)

    number of concurrent delivery workers

  • timeout (Numeric, nil)

    optional per-item timeout

  • preserve_order (Boolean)

    preserve output order when supported



16
17
18
19
20
21
22
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 16

def initialize(processor:, concurrency:, timeout:, preserve_order:)
  @processor = processor
  @concurrency = concurrency
  @timeout = timeout
  @preserve_order = preserve_order
  @pool = build_pool
end

Instance Attribute Details

#concurrencyObject (readonly)

Returns the value of attribute concurrency.



10
11
12
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 10

def concurrency
  @concurrency
end

#poolObject (readonly)

Returns the value of attribute pool.



10
11
12
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 10

def pool
  @pool
end

#preserve_orderObject (readonly)

Returns the value of attribute preserve_order.



10
11
12
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 10

def preserve_order
  @preserve_order
end

#processorObject (readonly)

Returns the value of attribute processor.



10
11
12
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 10

def processor
  @processor
end

#timeoutObject (readonly)

Returns the value of attribute timeout.



10
11
12
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 10

def timeout
  @timeout
end

Instance Method Details

#process_many(items) ⇒ Array<Object>

Process many work items through cdc-concurrent.

Parameters:

  • items (Array<Object>)

    CDC work units

Returns:

  • (Array<Object>)

    processor results



28
29
30
31
32
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 28

def process_many(items)
  return [] if items.empty?

  pool.process_many(items)
end

#shutdownnil

Shutdown the underlying runtime when supported.

Returns:

  • (nil)


37
38
39
40
# File 'lib/mammoth/concurrent_delivery_runtime.rb', line 37

def shutdown
  pool.shutdown if pool.respond_to?(:shutdown)
  nil
end