Class: CDC::SolidQueue::EventSerializer
- Inherits:
-
Object
- Object
- CDC::SolidQueue::EventSerializer
- Defined in:
- lib/cdc/solid_queue/event_serializer.rb
Overview
Converts CDC events into Solid Queue-safe payloads.
Payloads are plain hashes so Active Job can serialize them without needing to load the original event object in the queue database. rubocop:disable Metrics/ClassLength
Constant Summary collapse
- INTERNAL_METADATA_KEY =
Reserved payload key for cdc-solid-queue enqueue metadata.
'_cdc_solid_queue'- ORDERING_VALUE_FETCHERS =
Lookup table for ordering value extraction by ordering key.
{ identity: ->(payload) { payload['identity'] || payload['primary_key'] }, primary_key: ->(payload) { payload['identity'] || payload['primary_key'] }, relation: ->(payload) { [payload['namespace'] || payload['schema'], payload['entity'] || payload['table']] }, transaction: ->(payload) { payload['transaction_id'] }, global: ->(payload) { payload['source_position'] || payload['commit_lsn'] } }.freeze
Class Method Summary collapse
-
.dump(event) ⇒ Hash
Serialize an event-like object.
-
.dump_batch(events) ⇒ Array<Hash>
Serialize a batch of event-like objects.
-
.enqueue_metadata(payload) ⇒ Hash
Return cdc-solid-queue metadata from an enqueued payload.
-
.load(payload) ⇒ Hash
Load a serialized event payload.
-
.load_batch(payloads) ⇒ Array<Hash>
Load a batch of serialized event payloads.
-
.load_event(payload) ⇒ CDC::Core::ChangeEvent, Hash
Load a serialized event payload into a CDC event when possible.
-
.ordering_value(payload, key) ⇒ Object?
Return the ordering value for a serialized event.
-
.with_enqueue_metadata(payload, metadata) ⇒ Hash
Attach enqueue metadata without changing the event representation.
Class Method Details
.dump(event) ⇒ Hash
Serialize an event-like object.
27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 27 def self.dump(event) payload = if event.is_a?(Hash) event elsif event.respond_to?(:to_h) event.to_h else raise SerializationError, 'event must respond to to_h or be a Hash' end normalize_hash(payload) end |
.dump_batch(events) ⇒ Array<Hash>
Serialize a batch of event-like objects.
43 44 45 46 47 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 43 def self.dump_batch(events) raise SerializationError, 'events must be an Array' unless events.is_a?(Array) events.map { |event| dump(event) } end |
.enqueue_metadata(payload) ⇒ Hash
Return cdc-solid-queue metadata from an enqueued payload.
103 104 105 106 107 108 109 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 103 def self.(payload) return (payload) if payload.is_a?(Array) normalized = normalize_hash(payload) = normalized[INTERNAL_METADATA_KEY] .is_a?(Hash) ? : {} end |
.load(payload) ⇒ Hash
Load a serialized event payload.
54 55 56 57 58 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 54 def self.load(payload) raise SerializationError, 'payload must be a Hash' unless payload.is_a?(Hash) (normalize_hash(payload)) end |
.load_batch(payloads) ⇒ Array<Hash>
Load a batch of serialized event payloads.
64 65 66 67 68 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 64 def self.load_batch(payloads) raise SerializationError, 'payloads must be an Array' unless payloads.is_a?(Array) payloads.map { |payload| load(payload) } end |
.load_event(payload) ⇒ CDC::Core::ChangeEvent, Hash
Load a serialized event payload into a CDC event when possible.
74 75 76 77 78 79 80 81 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 74 def self.load_event(payload) return load_batch(payload).map { |item| load_event(item) } if payload.is_a?(Array) normalized = load(payload) return normalized unless change_event_payload?(normalized) build_change_event(normalized) end |
.ordering_value(payload, key) ⇒ Object?
Return the ordering value for a serialized event.
116 117 118 119 120 121 122 123 124 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 116 def self.ordering_value(payload, key) return payload.map { |item| ordering_value(item, key) } if payload.is_a?(Array) return nil if key == :none fetcher = ORDERING_VALUE_FETCHERS[key] return nil unless fetcher fetcher.call(load(payload)) end |
.with_enqueue_metadata(payload, metadata) ⇒ Hash
Attach enqueue metadata without changing the event representation.
88 89 90 91 92 93 94 95 96 97 |
# File 'lib/cdc/solid_queue/event_serializer.rb', line 88 def self.(payload, ) if payload.is_a?(Array) return payload.each_with_index.map do |child, index| (child, (, index)) end end normalized = normalize_hash(payload) normalized.merge(INTERNAL_METADATA_KEY => normalize_hash()) end |