Class: CDC::Parallel::Runtime

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/parallel/runtime.rb

Overview

High-level Ractor runtime facade for cdc-core processors.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, size: Etc.nprocessors, timeout: nil) ⇒ void

Parameters:

  • processor (CDC::Core::Processor)
  • size (Integer) (defaults to: Etc.nprocessors)
  • timeout (Float, nil) (defaults to: nil)


11
12
13
14
15
16
# File 'lib/cdc/parallel/runtime.rb', line 11

def initialize(processor:, size: Etc.nprocessors, timeout: nil)
  @processor_pool = ProcessorPool.new(processor:, size:, timeout:)
  @transaction_pool = TransactionPool.new(processor:, size:, timeout:)
  @router = Router.new(processor_pool: @processor_pool, transaction_pool: @transaction_pool)
  @shutdown = false
end

Instance Method Details

#process(item) ⇒ CDC::Core::ProcessorResult

Process a ChangeEvent or TransactionEnvelope.

Parameters:

  • item (CDC::Core::ChangeEvent, CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)

Raises:



22
23
24
25
26
# File 'lib/cdc/parallel/runtime.rb', line 22

def process(item)
  raise ShutdownError, "runtime has been shut down" if @shutdown

  @router.process(item)
end

#process_transaction(transaction) ⇒ CDC::Core::ProcessorResult

Alias for transaction-oriented processing.

Parameters:

  • transaction (CDC::Core::TransactionEnvelope)

Returns:

  • (CDC::Core::ProcessorResult)


32
33
34
# File 'lib/cdc/parallel/runtime.rb', line 32

def process_transaction(transaction)
  process(transaction)
end

#shutdownvoid

This method returns an undefined value.

Shut down all runtime resources.



39
40
41
42
43
44
45
# File 'lib/cdc/parallel/runtime.rb', line 39

def shutdown
  return if @shutdown

  @shutdown = true
  @processor_pool.shutdown
  @transaction_pool.shutdown
end