Class: Whodunit::Chronicles::Service
- Inherits:
-
Object
- Object
- Whodunit::Chronicles::Service
- Defined in:
- lib/whodunit/chronicles/service.rb
Overview
Main service orchestrator for audit streaming
Coordinates the stream adapter and audit processor to provide a complete audit streaming solution with error handling and monitoring.
Instance Attribute Summary collapse
-
#adapter ⇒ Object
readonly
Returns the value of attribute adapter.
-
#executor ⇒ Object
readonly
Returns the value of attribute executor.
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#processor ⇒ Object
readonly
Returns the value of attribute processor.
Instance Method Summary collapse
-
#build_adapter ⇒ Object
private
-
#handle_streaming_error(error) ⇒ Object
private
-
#initialize(adapter: nil, processor: nil, logger: Chronicles.logger) ⇒ Service
constructor
A new instance of Service.
-
#log(level, message, context = {}) ⇒ Object
private
-
#process_change_event(change_event) ⇒ Object
private
-
#running? ⇒ Boolean
Check if service is running.
-
#setup! ⇒ void
Set up the audit streaming infrastructure.
-
#should_audit_table?(change_event) ⇒ Boolean
private
-
#should_retry? ⇒ Boolean
private
-
#start ⇒ self
Start the audit streaming service.
-
#start_streaming_with_retry ⇒ Object
private
-
#status ⇒ Hash
Get service status information.
-
#stop ⇒ void
Stop the audit streaming service.
-
#teardown! ⇒ void
Tear down the audit streaming infrastructure.
-
#test_connections! ⇒ Object
private
-
#validate_setup! ⇒ Object
private
Constructor Details
#initialize(adapter: nil, processor: nil, logger: Chronicles.logger) ⇒ Service
Returns a new instance of Service.
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/whodunit/chronicles/service.rb', line 14 def initialize( adapter: nil, processor: nil, logger: Chronicles.logger ) @adapter = adapter || build_adapter @processor = processor || AuditProcessor.new(logger: logger) @logger = logger @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 4, max_queue: 100, fallback_policy: :caller_runs, ) @running = false @retry_count = 0 end |
Instance Attribute Details
#adapter ⇒ Object (readonly)
Returns the value of attribute adapter.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def adapter @adapter end |
#executor ⇒ Object (readonly)
Returns the value of attribute executor.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def executor @executor end |
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def logger @logger end |
#processor ⇒ Object (readonly)
Returns the value of attribute processor.
12 13 14 |
# File 'lib/whodunit/chronicles/service.rb', line 12 def processor @processor end |
Instance Method Details
#build_adapter ⇒ Object (private)
118 119 120 121 122 123 124 125 |
# File 'lib/whodunit/chronicles/service.rb', line 118 def build_adapter case Chronicles.config.adapter when :postgresql Adapters::PostgreSQL.new(logger: logger) else raise ConfigurationError, "Unsupported adapter: #{Chronicles.config.adapter}" end end |
#handle_streaming_error(error) ⇒ Object (private)
184 185 186 187 188 189 190 191 192 193 194 |
# File 'lib/whodunit/chronicles/service.rb', line 184 def handle_streaming_error(error) @retry_count += 1 log(:error, 'Streaming error occurred', error: error., retry_count: @retry_count, max_retries: Chronicles.config.max_retry_attempts ) # Wait before retry sleep(Chronicles.config.retry_delay) if should_retry? end |
#log(level, message, context = {}) ⇒ Object (private)
200 201 202 |
# File 'lib/whodunit/chronicles/service.rb', line 200 def log(level, , context = {}) logger.public_send(level, , service: 'Chronicles::Service', **context) end |
#process_change_event(change_event) ⇒ Object (private)
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/whodunit/chronicles/service.rb', line 160 def process_change_event(change_event) return unless change_event return unless should_audit_table?(change_event) log(:debug, 'Processing change event', table: change_event.qualified_table_name, action: change_event.action ) processor.process(change_event) rescue StandardError => e log(:error, 'Failed to process change event', error: e., event: change_event.to_s ) end |
#running? ⇒ Boolean
Check if service is running
76 77 78 |
# File 'lib/whodunit/chronicles/service.rb', line 76 def running? @running end |
#setup! ⇒ void
This method returns an undefined value.
Set up the audit streaming infrastructure
100 101 102 103 104 |
# File 'lib/whodunit/chronicles/service.rb', line 100 def setup! log(:info, 'Setting up audit streaming infrastructure') adapter.setup log(:info, 'Audit streaming infrastructure setup completed') end |
#should_audit_table?(change_event) ⇒ Boolean (private)
177 178 179 180 181 182 |
# File 'lib/whodunit/chronicles/service.rb', line 177 def should_audit_table?(change_event) Chronicles.config.audit_table?( change_event.table_name, change_event.schema_name, ) end |
#should_retry? ⇒ Boolean (private)
196 197 198 |
# File 'lib/whodunit/chronicles/service.rb', line 196 def should_retry? running? && @retry_count < Chronicles.config.max_retry_attempts end |
#start ⇒ self
Start the audit streaming service
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/whodunit/chronicles/service.rb', line 35 def start return self if running? log(:info, 'Starting Chronicles audit streaming service') validate_setup! test_connections! @running = true @retry_count = 0 start_streaming_with_retry log(:info, 'Chronicles audit streaming service started successfully') self rescue StandardError => e log(:error, 'Failed to start service', error: e.) @running = false raise end |
#start_streaming_with_retry ⇒ Object (private)
143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/whodunit/chronicles/service.rb', line 143 def start_streaming_with_retry @executor.post do loop do break unless running? begin adapter.start_streaming do |change_event| process_change_event(change_event) end rescue StandardError => e handle_streaming_error(e) break unless should_retry? end end end end |
#status ⇒ Hash
Get service status information
83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/whodunit/chronicles/service.rb', line 83 def status { running: running?, adapter_streaming: adapter.streaming?, adapter_position: adapter.current_position, retry_count: @retry_count, executor_status: { active_count: @executor.active_count, completed_task_count: @executor.completed_task_count, queue_length: @executor.queue_length, }, } end |
#stop ⇒ void
This method returns an undefined value.
Stop the audit streaming service
59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/whodunit/chronicles/service.rb', line 59 def stop return unless running? log(:info, 'Stopping Chronicles audit streaming service') @running = false adapter.stop_streaming if adapter.streaming? @executor.shutdown @executor.wait_for_termination(timeout: 30) processor.close log(:info, 'Chronicles audit streaming service stopped') end |
#teardown! ⇒ void
This method returns an undefined value.
Tear down the audit streaming infrastructure
109 110 111 112 113 114 |
# File 'lib/whodunit/chronicles/service.rb', line 109 def teardown! log(:info, 'Tearing down audit streaming infrastructure') stop if running? adapter.teardown log(:info, 'Audit streaming infrastructure teardown completed') end |
#test_connections! ⇒ Object (private)
135 136 137 138 139 140 141 |
# File 'lib/whodunit/chronicles/service.rb', line 135 def test_connections! adapter.test_connection # Test audit processor connection by creating a dummy connection processor.send(:ensure_audit_connection) rescue StandardError => e raise AdapterError, "Connection test failed: #{e.}" end |
#validate_setup! ⇒ Object (private)
127 128 129 130 131 132 133 |
# File 'lib/whodunit/chronicles/service.rb', line 127 def validate_setup! Chronicles.config.validate! return if adapter.test_connection raise AdapterError, 'Failed to connect to source database' end |