Class: Mammoth::DeliveredEnvelopeStore
- Inherits:
-
Object
- Object
- Mammoth::DeliveredEnvelopeStore
- Defined in:
- lib/mammoth/delivered_envelope_store.rb
Overview
SQLite-backed ledger of downstream deliveries.
The PostgreSQL replication boundary is at-least-once. Mammoth therefore keeps a small delivery ledger so a transaction replayed by pgoutput-client after restart does not have to be delivered downstream again.
Constant Summary collapse
- SCHEMA =
<<~SQL CREATE TABLE IF NOT EXISTS delivered_envelopes ( id INTEGER PRIMARY KEY, idempotency_key TEXT NOT NULL, source_name TEXT NOT NULL, slot_name TEXT NOT NULL, destination_name TEXT NOT NULL, delivery_unit TEXT NOT NULL, transaction_id TEXT, source_position TEXT, delivered_at TEXT NOT NULL, UNIQUE (idempotency_key) ); CREATE INDEX IF NOT EXISTS idx_delivered_envelopes_source ON delivered_envelopes(source_name, slot_name); CREATE INDEX IF NOT EXISTS idx_delivered_envelopes_destination ON delivered_envelopes(destination_name); CREATE INDEX IF NOT EXISTS idx_delivered_envelopes_source_position ON delivered_envelopes(source_position); SQL
Instance Attribute Summary collapse
-
#sqlite_store ⇒ Object
readonly
Returns the value of attribute sqlite_store.
Instance Method Summary collapse
- #all ⇒ Object
- #count ⇒ Object
- #delivered?(idempotency_key) ⇒ Boolean
-
#initialize(sqlite_store) ⇒ DeliveredEnvelopeStore
constructor
A new instance of DeliveredEnvelopeStore.
- #record!(idempotency_key:, source_name:, slot_name:, destination_name:, delivery_unit:, transaction_id:, source_position:) ⇒ Object
Constructor Details
#initialize(sqlite_store) ⇒ DeliveredEnvelopeStore
Returns a new instance of DeliveredEnvelopeStore.
39 40 41 42 |
# File 'lib/mammoth/delivered_envelope_store.rb', line 39 def initialize(sqlite_store) @sqlite_store = sqlite_store ensure_schema! end |
Instance Attribute Details
#sqlite_store ⇒ Object (readonly)
Returns the value of attribute sqlite_store.
37 38 39 |
# File 'lib/mammoth/delivered_envelope_store.rb', line 37 def sqlite_store @sqlite_store end |
Instance Method Details
#all ⇒ Object
78 79 80 |
# File 'lib/mammoth/delivered_envelope_store.rb', line 78 def all database.execute("SELECT * FROM delivered_envelopes ORDER BY id") end |
#count ⇒ Object
82 83 84 |
# File 'lib/mammoth/delivered_envelope_store.rb', line 82 def count database.get_first_value("SELECT COUNT(*) FROM delivered_envelopes") end |
#delivered?(idempotency_key) ⇒ Boolean
44 45 46 47 48 49 |
# File 'lib/mammoth/delivered_envelope_store.rb', line 44 def delivered?(idempotency_key) !database.execute( "SELECT 1 FROM delivered_envelopes WHERE idempotency_key = ? LIMIT 1", [idempotency_key] ).empty? end |
#record!(idempotency_key:, source_name:, slot_name:, destination_name:, delivery_unit:, transaction_id:, source_position:) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/mammoth/delivered_envelope_store.rb', line 51 def record!(idempotency_key:, source_name:, slot_name:, destination_name:, delivery_unit:, transaction_id:, source_position:) database.execute( <<~SQL, INSERT OR IGNORE INTO delivered_envelopes( idempotency_key, source_name, slot_name, destination_name, delivery_unit, transaction_id, source_position, delivered_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) SQL [ idempotency_key, source_name, slot_name, destination_name, delivery_unit, transaction_id, source_position, Time.now.utc.iso8601 ] ) database.get_first_row( "SELECT * FROM delivered_envelopes WHERE idempotency_key = ? LIMIT 1", [idempotency_key] ) end |