Class: Whodunit::Chronicles::Service

Inherits:
Object
  • Object
show all
Defined in:
lib/whodunit/chronicles/service.rb

Overview

Main service orchestrator for audit streaming

Coordinates the stream adapter and audit processor to provide a complete audit streaming solution with error handling and monitoring.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(adapter: nil, processor: nil, logger: Chronicles.logger) ⇒ Service

Returns a new instance of Service.



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/whodunit/chronicles/service.rb', line 14

def initialize(
  adapter: nil,
  processor: nil,
  logger: Chronicles.logger
)
  @adapter = adapter || build_adapter
  @processor = processor || AuditProcessor.new(logger: logger)
  @logger = logger
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads: 1,
    max_threads: 4,
    max_queue: 100,
    fallback_policy: :caller_runs,
  )
  @running = false
  @retry_count = 0
end

Instance Attribute Details

#adapterObject (readonly)

Returns the value of attribute adapter.



12
13
14
# File 'lib/whodunit/chronicles/service.rb', line 12

def adapter
  @adapter
end

#executorObject (readonly)

Returns the value of attribute executor.



12
13
14
# File 'lib/whodunit/chronicles/service.rb', line 12

def executor
  @executor
end

#loggerObject (readonly)

Returns the value of attribute logger.



12
13
14
# File 'lib/whodunit/chronicles/service.rb', line 12

def logger
  @logger
end

#processorObject (readonly)

Returns the value of attribute processor.



12
13
14
# File 'lib/whodunit/chronicles/service.rb', line 12

def processor
  @processor
end

Instance Method Details

#build_adapterObject (private)



118
119
120
121
122
123
124
125
# File 'lib/whodunit/chronicles/service.rb', line 118

def build_adapter
  case Chronicles.config.adapter
  when :postgresql
    Adapters::PostgreSQL.new(logger: logger)
  else
    raise ConfigurationError, "Unsupported adapter: #{Chronicles.config.adapter}"
  end
end

#handle_streaming_error(error) ⇒ Object (private)



184
185
186
187
188
189
190
191
192
193
194
# File 'lib/whodunit/chronicles/service.rb', line 184

def handle_streaming_error(error)
  @retry_count += 1
  log(:error, 'Streaming error occurred',
    error: error.message,
    retry_count: @retry_count,
    max_retries: Chronicles.config.max_retry_attempts
  )

  # Wait before retry
  sleep(Chronicles.config.retry_delay) if should_retry?
end

#log(level, message, context = {}) ⇒ Object (private)



200
201
202
# File 'lib/whodunit/chronicles/service.rb', line 200

def log(level, message, context = {})
  logger.public_send(level, message, service: 'Chronicles::Service', **context)
end

#process_change_event(change_event) ⇒ Object (private)



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/whodunit/chronicles/service.rb', line 160

def process_change_event(change_event)
  return unless change_event
  return unless should_audit_table?(change_event)

  log(:debug, 'Processing change event',
    table: change_event.qualified_table_name,
    action: change_event.action
  )

  processor.process(change_event)
rescue StandardError => e
  log(:error, 'Failed to process change event',
    error: e.message,
    event: change_event.to_s
  )
end

#running?Boolean

Check if service is running

Returns:

  • (Boolean)


76
77
78
# File 'lib/whodunit/chronicles/service.rb', line 76

def running?
  @running
end

#setup!void

This method returns an undefined value.

Set up the audit streaming infrastructure



100
101
102
103
104
# File 'lib/whodunit/chronicles/service.rb', line 100

def setup!
  log(:info, 'Setting up audit streaming infrastructure')
  adapter.setup
  log(:info, 'Audit streaming infrastructure setup completed')
end

#should_audit_table?(change_event) ⇒ Boolean (private)

Returns:

  • (Boolean)


177
178
179
180
181
182
# File 'lib/whodunit/chronicles/service.rb', line 177

def should_audit_table?(change_event)
  Chronicles.config.audit_table?(
    change_event.table_name,
    change_event.schema_name,
  )
end

#should_retry?Boolean (private)

Returns:

  • (Boolean)


196
197
198
# File 'lib/whodunit/chronicles/service.rb', line 196

def should_retry?
  running? && @retry_count < Chronicles.config.max_retry_attempts
end

#startself

Start the audit streaming service

Returns:

  • (self)


35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/whodunit/chronicles/service.rb', line 35

def start
  return self if running?

  log(:info, 'Starting Chronicles audit streaming service')

  validate_setup!
  test_connections!

  @running = true
  @retry_count = 0

  start_streaming_with_retry

  log(:info, 'Chronicles audit streaming service started successfully')
  self
rescue StandardError => e
  log(:error, 'Failed to start service', error: e.message)
  @running = false
  raise
end

#start_streaming_with_retryObject (private)



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/whodunit/chronicles/service.rb', line 143

def start_streaming_with_retry
  @executor.post do
    loop do
      break unless running?

      begin
        adapter.start_streaming do |change_event|
          process_change_event(change_event)
        end
      rescue StandardError => e
        handle_streaming_error(e)
        break unless should_retry?
      end
    end
  end
end

#statusHash

Get service status information

Returns:

  • (Hash)


83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/whodunit/chronicles/service.rb', line 83

def status
  {
    running: running?,
    adapter_streaming: adapter.streaming?,
    adapter_position: adapter.current_position,
    retry_count: @retry_count,
    executor_status: {
      active_count: @executor.active_count,
      completed_task_count: @executor.completed_task_count,
      queue_length: @executor.queue_length,
    },
  }
end

#stopvoid

This method returns an undefined value.

Stop the audit streaming service



59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/whodunit/chronicles/service.rb', line 59

def stop
  return unless running?

  log(:info, 'Stopping Chronicles audit streaming service')
  @running = false

  adapter.stop_streaming if adapter.streaming?
  @executor.shutdown
  @executor.wait_for_termination(timeout: 30)

  processor.close
  log(:info, 'Chronicles audit streaming service stopped')
end

#teardown!void

This method returns an undefined value.

Tear down the audit streaming infrastructure



109
110
111
112
113
114
# File 'lib/whodunit/chronicles/service.rb', line 109

def teardown!
  log(:info, 'Tearing down audit streaming infrastructure')
  stop if running?
  adapter.teardown
  log(:info, 'Audit streaming infrastructure teardown completed')
end

#test_connections!Object (private)



135
136
137
138
139
140
141
# File 'lib/whodunit/chronicles/service.rb', line 135

def test_connections!
  adapter.test_connection
  # Test audit processor connection by creating a dummy connection
  processor.send(:ensure_audit_connection)
rescue StandardError => e
  raise AdapterError, "Connection test failed: #{e.message}"
end

#validate_setup!Object (private)

Raises:



127
128
129
130
131
132
133
# File 'lib/whodunit/chronicles/service.rb', line 127

def validate_setup!
  Chronicles.config.validate!

  return if adapter.test_connection

  raise AdapterError, 'Failed to connect to source database'
end