Class: CDC::Concurrent::ProcessorPool

Inherits:
Object
  • Object
show all
Defined in:
lib/cdc/concurrent/processor_pool.rb

Overview

Executes one concurrent-safe processor using Async tasks.

Instance Method Summary collapse

Constructor Details

#initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true) ⇒ ProcessorPool

Returns a new instance of ProcessorPool.

Parameters:

  • processor (CDC::Core::Processor)
  • concurrency (Integer) (defaults to: 100)
  • timeout (Float, nil) (defaults to: nil)
  • preserve_order (Boolean) (defaults to: true)


11
12
13
14
15
16
17
# File 'lib/cdc/concurrent/processor_pool.rb', line 11

def initialize(processor:, concurrency: 100, timeout: nil, preserve_order: true)
  validate_processor!(processor)

  @processor = processor
  @configuration = Configuration.new(concurrency:, timeout:, preserve_order:)
  @shutdown = false
end

Instance Method Details

#process(event) ⇒ CDC::Core::ProcessorResult

Parameters:

  • event (CDC::Core::ChangeEvent)

Returns:

  • (CDC::Core::ProcessorResult)

Raises:



21
22
23
24
25
# File 'lib/cdc/concurrent/processor_pool.rb', line 21

def process(event)
  raise ShutdownError, "processor pool has been shut down" if @shutdown

  process_one(event)
end

#process_many(events) ⇒ Array<CDC::Core::ProcessorResult>

Parameters:

  • events (Array<CDC::Core::ChangeEvent>)

Returns:

  • (Array<CDC::Core::ProcessorResult>)

Raises:



29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/cdc/concurrent/processor_pool.rb', line 29

def process_many(events)
  raise ShutdownError, "processor pool has been shut down" if @shutdown
  return empty_results if events.empty?

  # @type var indexed_results: Array[[Integer, CDC::Core::ProcessorResult]]
  indexed_results = []

  process_batch(events, indexed_results)

  indexed_results.sort_by!(&:first) if @configuration.preserve_order
  indexed_results.map(&:last).freeze
end

#shutdownvoid

This method returns an undefined value.



43
44
45
# File 'lib/cdc/concurrent/processor_pool.rb', line 43

def shutdown
  @shutdown = true
end