From 7e31368fa3dd2f3ff142155f8eb9e0980a501644 Mon Sep 17 00:00:00 2001 From: Madeorsk Date: Tue, 22 Oct 2024 14:09:02 +0200 Subject: [PATCH] Introduce generic connectors and connections and use them in the queries. --- src/database.zig | 109 +++++++++++++++++++++++++++++++++++++++++++ src/insert.zig | 20 ++++---- src/postgresql.zig | 5 +- src/query.zig | 26 ++++++----- src/repository.zig | 14 +++--- src/root.zig | 2 + src/update.zig | 20 ++++---- tests/composite.zig | 14 ++++-- tests/repository.zig | 39 ++++++++++++---- 9 files changed, 195 insertions(+), 54 deletions(-) create mode 100644 src/database.zig diff --git a/src/database.zig b/src/database.zig new file mode 100644 index 0000000..0b54c3d --- /dev/null +++ b/src/database.zig @@ -0,0 +1,109 @@ +const std = @import("std"); +const pg = @import("pg"); +const session = @import("session.zig"); + +/// Abstract connection, provided by a connector. +pub const Connection = struct { + /// Raw connection. + connection: *pg.Conn, + + /// Connection implementation. + _interface: struct { + instance: *anyopaque, + release: *const fn (self: *Connection) void, + }, + + /// Release the connection. + pub fn release(self: *Connection) void { + self._interface.release(self); + } +}; + +/// Database connection manager for queries. +pub const Connector = struct { + const Self = @This(); + + /// Internal interface structure. + _interface: struct { + instance: *anyopaque, + getConnection: *const fn (self: *anyopaque) anyerror!*Connection, + }, + + /// Get a connection. + pub fn getConnection(self: Self) !*Connection { + return try self._interface.getConnection(self._interface.instance); + } +}; + + + +/// A simple pool connection. +pub const PoolConnection = struct { + const Self = @This(); + + /// Connector of the connection. + connector: *PoolConnector, + /// Connection instance, to only keep one at a time. + _connection: ?Connection = null, + + /// Get a database connection. + pub fn connection(self: *Self) !*Connection { + if (self._connection == null) { + // A new connection needs to be initialized. + self._connection = .{ + .connection = try self.connector.pool.acquire(), + ._interface = .{ + .instance = self, + .release = releaseConnection, + }, + }; + } + + // Return the initialized connection. + return &(self._connection.?); + } + + // Implementation. + + /// Release the pool connection. + fn releaseConnection(self: *Connection) void { + self.connection.release(); + + // Free allocated connection. + const poolConnection: *PoolConnection = @ptrCast(@alignCast(self._interface.instance)); + poolConnection.connector.pool._allocator.destroy(poolConnection); + } +}; + +/// A simple pool connector. +pub const PoolConnector = struct { + const Self = @This(); + + pool: *pg.Pool, + + /// Get a database connector instance for the current pool. + pub fn connector(self: *Self) Connector { + return .{ + ._interface = .{ + .instance = self, + .getConnection = getConnection, + }, + }; + } + + // Implementation. + + /// Get the connection from the pool. + fn getConnection(opaqueSelf: *anyopaque) !*Connection { + const self: *Self = @ptrCast(@alignCast(opaqueSelf)); + + // Initialize a new connection. + const poolConnection = try self.pool._allocator.create(PoolConnection); + poolConnection.* = .{ + .connector = self, + }; + + // Acquire a new connection from the pool. + return try poolConnection.connection(); + } +}; diff --git a/src/insert.zig b/src/insert.zig index b102f3e..ceba9d5 100644 --- a/src/insert.zig +++ b/src/insert.zig @@ -2,6 +2,7 @@ const std = @import("std"); const pg = @import("pg"); const zollections = @import("zollections"); const errors = @import("errors.zig"); +const database = @import("database.zig"); const postgresql = @import("postgresql.zig"); const _sql = @import("sql.zig"); const repository = @import("repository.zig"); @@ -118,7 +119,8 @@ pub fn RepositoryInsert(comptime Model: type, comptime TableShape: type, comptim const Configuration = RepositoryInsertConfiguration(InsertShape); arena: std.heap.ArenaAllocator, - database: *pg.Pool, + connector: database.Connector, + connection: *database.Connection = undefined, insertConfig: Configuration, sql: ?[]const u8 = null, @@ -289,20 +291,19 @@ pub fn RepositoryInsert(comptime Model: type, comptime TableShape: type, comptim /// Execute the insert query. fn execQuery(self: *Self) !*pg.Result { // Get a connection to the database. - const connection = try self.database.acquire(); - errdefer connection.release(); + self.connection = try self.connector.getConnection(); + errdefer self.connection.release(); // Initialize a new PostgreSQL statement. - var statement = try pg.Stmt.init(connection, .{ + var statement = try pg.Stmt.init(self.connection.connection, .{ .column_names = true, - .release_conn = true, .allocator = self.arena.allocator(), }); errdefer statement.deinit(); // Prepare SQL insert query. statement.prepare(self.sql.?) - catch |err| return postgresql.handlePostgresqlError(err, connection, &statement); + catch |err| return postgresql.handlePostgresqlError(err, self.connection, &statement); // Bind INSERT query parameters. for (self.insertConfig.values) |row| { @@ -317,7 +318,7 @@ pub fn RepositoryInsert(comptime Model: type, comptime TableShape: type, comptim // Execute the query and get its result. const result = statement.execute() - catch |err| return postgresql.handlePostgresqlError(err, connection, &statement); + catch |err| return postgresql.handlePostgresqlError(err, self.connection, &statement); // Query executed successfully, return the result. return result; @@ -330,6 +331,7 @@ pub fn RepositoryInsert(comptime Model: type, comptime TableShape: type, comptim // Execute query and get its result. var queryResult = try self.execQuery(); + defer self.connection.release(); defer queryResult.deinit(); //TODO deduplicate this in postgresql.zig, we could do it if Mapper type was exposed. @@ -359,11 +361,11 @@ pub fn RepositoryInsert(comptime Model: type, comptime TableShape: type, comptim } /// Initialize a new repository insert query. - pub fn init(allocator: std.mem.Allocator, database: *pg.Pool) Self { + pub fn init(allocator: std.mem.Allocator, connector: database.Connector) Self { return .{ // Initialize an arena allocator for the insert query. .arena = std.heap.ArenaAllocator.init(allocator), - .database = database, + .connector = connector, .insertConfig = .{}, }; } diff --git a/src/postgresql.zig b/src/postgresql.zig index f74faae..afbde25 100644 --- a/src/postgresql.zig +++ b/src/postgresql.zig @@ -2,6 +2,7 @@ const std = @import("std"); const pg = @import("pg"); const global = @import("global.zig"); const errors = @import("errors.zig"); +const database = @import("database.zig"); const _sql = @import("sql.zig"); const repository = @import("repository.zig"); @@ -31,12 +32,12 @@ pub fn bindQueryParameter(statement: *pg.Stmt, parameter: _sql.QueryParameter) ! } /// PostgreSQL error handling by ZRM. -pub fn handlePostgresqlError(err: anyerror, connection: *pg.Conn, statement: *pg.Stmt) anyerror { +pub fn handlePostgresqlError(err: anyerror, connection: *database.Connection, statement: *pg.Stmt) anyerror { // Release connection and statement as query failed. defer statement.deinit(); defer connection.release(); - if (connection.err) |sqlErr| { + if (connection.connection.err) |sqlErr| { if (global.debugMode) { // If debug mode is enabled, show the PostgreSQL error. std.debug.print("PostgreSQL error\n{s}: {s}\n", .{sqlErr.code, sqlErr.message}); diff --git a/src/query.zig b/src/query.zig index a462810..0234a13 100644 --- a/src/query.zig +++ b/src/query.zig @@ -2,6 +2,7 @@ const std = @import("std"); const pg = @import("pg"); const zollections = @import("zollections"); const errors = @import("errors.zig"); +const database = @import("database.zig"); const postgresql = @import("postgresql.zig"); const _sql = @import("sql.zig"); const conditions = @import("conditions.zig"); @@ -29,7 +30,8 @@ pub fn RepositoryQuery(comptime Model: type, comptime TableShape: type, comptime const Self = @This(); arena: std.heap.ArenaAllocator, - database: *pg.Pool, + connector: database.Connector, + connection: *database.Connection = undefined, queryConfig: RepositoryQueryConfiguration, sql: ?[]const u8 = null, @@ -176,23 +178,21 @@ pub fn RepositoryQuery(comptime Model: type, comptime TableShape: type, comptime } /// Execute the built query. - fn execQuery(self: *Self) !*pg.Result - { - // Get a connection to the database. - const connection = try self.database.acquire(); - errdefer connection.release(); + fn execQuery(self: *Self) !*pg.Result { + // Get the connection to the database. + self.connection = try self.connector.getConnection(); + errdefer self.connection.release(); // Initialize a new PostgreSQL statement. - var statement = try pg.Stmt.init(connection, .{ + var statement = try pg.Stmt.init(self.connection.connection, .{ .column_names = true, - .release_conn = true, .allocator = self.arena.allocator(), }); errdefer statement.deinit(); // Prepare SQL query. statement.prepare(self.sql.?) - catch |err| return postgresql.handlePostgresqlError(err, connection, &statement); + catch |err| return postgresql.handlePostgresqlError(err, self.connection, &statement); // Bind query parameters. if (self.queryConfig.select) |_select| @@ -204,7 +204,7 @@ pub fn RepositoryQuery(comptime Model: type, comptime TableShape: type, comptime // Execute the query and get its result. const result = statement.execute() - catch |err| return postgresql.handlePostgresqlError(err, connection, &statement); + catch |err| return postgresql.handlePostgresqlError(err, self.connection, &statement); // Query executed successfully, return the result. return result; @@ -217,6 +217,7 @@ pub fn RepositoryQuery(comptime Model: type, comptime TableShape: type, comptime // Execute query and get its result. var queryResult = try self.execQuery(); + defer self.connection.release(); defer queryResult.deinit(); //TODO deduplicate this in postgresql.zig, we could do it if Mapper type was exposed. @@ -246,17 +247,18 @@ pub fn RepositoryQuery(comptime Model: type, comptime TableShape: type, comptime } /// Initialize a new repository query. - pub fn init(allocator: std.mem.Allocator, database: *pg.Pool, queryConfig: RepositoryQueryConfiguration) Self { + pub fn init(allocator: std.mem.Allocator, connector: database.Connector, queryConfig: RepositoryQueryConfiguration) Self { return .{ // Initialize the query arena allocator. .arena = std.heap.ArenaAllocator.init(allocator), - .database = database, + .connector = connector, .queryConfig = queryConfig, }; } /// Deinitialize the repository query. pub fn deinit(self: *Self) void { + // Free everything allocated for this query. self.arena.deinit(); } }; diff --git a/src/repository.zig b/src/repository.zig index d09b8a4..f251a72 100644 --- a/src/repository.zig +++ b/src/repository.zig @@ -1,6 +1,6 @@ const std = @import("std"); -const pg = @import("pg"); const zollections = @import("zollections"); +const database = @import("database.zig"); const _sql = @import("sql.zig"); const _conditions = @import("conditions.zig"); const query = @import("query.zig"); @@ -97,9 +97,9 @@ pub fn Repository(comptime Model: type, comptime TableShape: type, comptime conf /// modelKey can be an array / slice of keys. /// For composite keys: modelKey must be a struct with all the keys, matching the type of their corresponding field. /// modelKey can be an array / slice of these structs. - pub fn find(allocator: std.mem.Allocator, database: *pg.Pool, modelKey: anytype) !RepositoryResult(Model) { + pub fn find(allocator: std.mem.Allocator, connector: database.Connector, modelKey: anytype) !RepositoryResult(Model) { // Initialize a new query. - var modelQuery = Self.Query.init(allocator, database, .{}); + var modelQuery = Self.Query.init(allocator, connector, .{}); defer modelQuery.deinit(); if (config.key.len == 1) { @@ -179,9 +179,9 @@ pub fn Repository(comptime Model: type, comptime TableShape: type, comptime conf /// Perform creation of the given new model in the repository. /// The model will be altered with the inserted values. - pub fn create(allocator: std.mem.Allocator, database: *pg.Pool, newModel: *Model) !RepositoryResult(Model) { + pub fn create(allocator: std.mem.Allocator, connector: database.Connector, newModel: *Model) !RepositoryResult(Model) { // Initialize a new insert query for the given model. - var insertQuery = Self.Insert.init(allocator, database); + var insertQuery = Self.Insert.init(allocator, connector); defer insertQuery.deinit(); try insertQuery.values(newModel); insertQuery.returningAll(); @@ -199,12 +199,12 @@ pub fn Repository(comptime Model: type, comptime TableShape: type, comptime conf } /// Perform save of the given existing model in the repository. - pub fn save(allocator: std.mem.Allocator, database: *pg.Pool, existingModel: *Model) !RepositoryResult(Model) { + pub fn save(allocator: std.mem.Allocator, connector: database.Connector, existingModel: *Model) !RepositoryResult(Model) { // Convert the model to its SQL form. const modelSql = try config.toSql(existingModel.*); // Initialize a new update query for the given model. - var updateQuery = Self.Update(TableShape).init(allocator, database); + var updateQuery = Self.Update(TableShape).init(allocator, connector); defer updateQuery.deinit(); try updateQuery.set(modelSql); updateQuery.returningAll(); diff --git a/src/root.zig b/src/root.zig index 12e42c0..c673b7c 100644 --- a/src/root.zig +++ b/src/root.zig @@ -14,6 +14,8 @@ pub const Insertable = insert.Insertable; pub const QueryParameter = _sql.QueryParameter; pub const SqlParams = _sql.SqlParams; +pub const database = @import("database.zig"); + pub const conditions = @import("conditions.zig"); pub const errors = @import("errors.zig"); diff --git a/src/update.zig b/src/update.zig index 88b38a2..87b2d9b 100644 --- a/src/update.zig +++ b/src/update.zig @@ -2,6 +2,7 @@ const std = @import("std"); const pg = @import("pg"); const zollections = @import("zollections"); const errors = @import("errors.zig"); +const database = @import("database.zig"); const postgresql = @import("postgresql.zig"); const _sql = @import("sql.zig"); const conditions = @import("conditions.zig"); @@ -57,7 +58,8 @@ pub fn RepositoryUpdate(comptime Model: type, comptime TableShape: type, comptim const Configuration = RepositoryUpdateConfiguration(UpdateShape); arena: std.heap.ArenaAllocator, - database: *pg.Pool, + connector: database.Connector, + connection: *database.Connection = undefined, updateConfig: Configuration, sql: ?[]const u8 = null, @@ -261,20 +263,19 @@ pub fn RepositoryUpdate(comptime Model: type, comptime TableShape: type, comptim /// Execute the update query. fn execQuery(self: *Self) !*pg.Result { // Get a connection to the database. - const connection = try self.database.acquire(); - errdefer connection.release(); + self.connection = try self.connector.getConnection(); + errdefer self.connection.release(); // Initialize a new PostgreSQL statement. - var statement = try pg.Stmt.init(connection, .{ + var statement = try pg.Stmt.init(self.connection.connection, .{ .column_names = true, - .release_conn = true, .allocator = self.arena.allocator(), }); errdefer statement.deinit(); // Prepare SQL update query. statement.prepare(self.sql.?) - catch |err| return postgresql.handlePostgresqlError(err, connection, &statement); + catch |err| return postgresql.handlePostgresqlError(err, self.connection, &statement); // Bind UPDATE query parameters. inline for (columns) |column| { @@ -291,7 +292,7 @@ pub fn RepositoryUpdate(comptime Model: type, comptime TableShape: type, comptim // Execute the query and get its result. const result = statement.execute() - catch |err| return postgresql.handlePostgresqlError(err, connection, &statement); + catch |err| return postgresql.handlePostgresqlError(err, self.connection, &statement); // Query executed successfully, return the result. return result; @@ -304,6 +305,7 @@ pub fn RepositoryUpdate(comptime Model: type, comptime TableShape: type, comptim // Execute query and get its result. var queryResult = try self.execQuery(); + defer self.connection.release(); defer queryResult.deinit(); //TODO deduplicate this in postgresql.zig, we could do it if Mapper type was exposed. @@ -333,11 +335,11 @@ pub fn RepositoryUpdate(comptime Model: type, comptime TableShape: type, comptim } /// Initialize a new repository update query. - pub fn init(allocator: std.mem.Allocator, database: *pg.Pool) Self { + pub fn init(allocator: std.mem.Allocator, connector: database.Connector) Self { return .{ // Initialize an arena allocator for the update query. .arena = std.heap.ArenaAllocator.init(allocator), - .database = database, + .connector = connector, .updateConfig = .{}, }; } diff --git a/tests/composite.zig b/tests/composite.zig index f5479f8..6e51b8e 100644 --- a/tests/composite.zig +++ b/tests/composite.zig @@ -17,6 +17,7 @@ fn initDatabase() !void { .password = "zrm", .database = "zrm", }, + .size = 1, }); } @@ -76,6 +77,9 @@ test "composite model create, save and find" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; // Initialize a test model. var newModel = CompositeModel{ @@ -86,7 +90,7 @@ test "composite model create, save and find" { // Create the new model. - var result = try CompositeModelRepository.create(std.testing.allocator, database, &newModel); + var result = try CompositeModelRepository.create(std.testing.allocator, poolConnector.connector(), &newModel); defer result.deinit(); // Will clear some values in newModel. // Check that the model is correctly defined. @@ -101,7 +105,7 @@ test "composite model create, save and find" { // Update the model. newModel.label = null; - var result2 = try CompositeModelRepository.save(std.testing.allocator, database, &newModel); + var result2 = try CompositeModelRepository.save(std.testing.allocator, poolConnector.connector(), &newModel); defer result2.deinit(); // Will clear some values in newModel. // Checking that the model has been updated (but only the right field). @@ -111,7 +115,7 @@ test "composite model create, save and find" { // Do another insert with the same secondcol. - var insertQuery = CompositeModelRepository.Insert.init(std.testing.allocator, database); + var insertQuery = CompositeModelRepository.Insert.init(std.testing.allocator, poolConnector.connector()); defer insertQuery.deinit(); try insertQuery.values(.{ .secondcol = "identifier", @@ -128,7 +132,7 @@ test "composite model create, save and find" { // Try to find the created then saved model, to check that everything has been saved correctly. - var result4 = try CompositeModelRepository.find(std.testing.allocator, database, .{ + var result4 = try CompositeModelRepository.find(std.testing.allocator, poolConnector.connector(), .{ .firstcol = newModel.firstcol, .secondcol = newModel.secondcol, }); @@ -139,7 +143,7 @@ test "composite model create, save and find" { // Try to find multiple models at once. - var result5 = try CompositeModelRepository.find(std.testing.allocator, database, &[_]CompositeModelRepository.KeyType{ + var result5 = try CompositeModelRepository.find(std.testing.allocator, poolConnector.connector(), &[_]CompositeModelRepository.KeyType{ .{ .firstcol = newModel.firstcol, .secondcol = newModel.secondcol, diff --git a/tests/repository.zig b/tests/repository.zig index 6edb1da..1e0eb25 100644 --- a/tests/repository.zig +++ b/tests/repository.zig @@ -17,6 +17,7 @@ fn initDatabase() !void { .password = "zrm", .database = "zrm", }, + .size = 1, }); } @@ -105,8 +106,11 @@ test "repository query SQL builder" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; - var query = MyModelRepository.Query.init(std.testing.allocator, database, .{}); + var query = MyModelRepository.Query.init(std.testing.allocator, poolConnector.connector(), .{}); defer query.deinit(); try query.whereIn(usize, "id", &[_]usize{1, 2}); try query.buildSql(); @@ -121,9 +125,12 @@ test "repository element retrieval" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; // Prepare a query for models. - var query = MyModelRepository.Query.init(std.testing.allocator, database, .{}); + var query = MyModelRepository.Query.init(std.testing.allocator, poolConnector.connector(), .{}); try query.whereValue(usize, "id", "=", 1); defer query.deinit(); @@ -152,8 +159,11 @@ test "repository complex SQL query" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; - var query = MyModelRepository.Query.init(std.testing.allocator, database, .{}); + var query = MyModelRepository.Query.init(std.testing.allocator, poolConnector.connector(), .{}); defer query.deinit(); query.where( try query.newCondition().@"or"(&[_]zrm.SqlParams{ @@ -187,6 +197,9 @@ test "repository element creation" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; // Create a model to insert. const newModel = MyModel{ @@ -196,7 +209,7 @@ test "repository element creation" { }; // Initialize an insert query. - var insertQuery = MyModelRepository.Insert.init(std.testing.allocator, database); + var insertQuery = MyModelRepository.Insert.init(std.testing.allocator, poolConnector.connector()); defer insertQuery.deinit(); // Insert the new model. try insertQuery.values(newModel); @@ -225,11 +238,14 @@ test "repository element update" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; // Initialize an update query. var updateQuery = MyModelRepository.Update(struct { name: []const u8, - }).init(std.testing.allocator, database); + }).init(std.testing.allocator, poolConnector.connector()); defer updateQuery.deinit(); // Update a model's name. @@ -260,6 +276,9 @@ test "model create, save and find" { try initDatabase(); defer database.deinit(); + var poolConnector = zrm.database.PoolConnector{ + .pool = database, + }; // Initialize a test model. var newModel = MyModel{ @@ -270,7 +289,7 @@ test "model create, save and find" { // Create the new model. - var result = try MyModelRepository.create(std.testing.allocator, database, &newModel); + var result = try MyModelRepository.create(std.testing.allocator, poolConnector.connector(), &newModel); defer result.deinit(); // Will clear some values in newModel. // Check that the model is correctly defined. @@ -284,7 +303,7 @@ test "model create, save and find" { // Update the model. newModel.name = "recently updated name"; - var result2 = try MyModelRepository.save(std.testing.allocator, database, &newModel); + var result2 = try MyModelRepository.save(std.testing.allocator, poolConnector.connector(), &newModel); defer result2.deinit(); // Will clear some values in newModel. // Checking that the model has been updated (but only the right field). @@ -296,7 +315,7 @@ test "model create, save and find" { // Do another update. newModel.amount = 12.226; - var result3 = try MyModelRepository.save(std.testing.allocator, database, &newModel); + var result3 = try MyModelRepository.save(std.testing.allocator, poolConnector.connector(), &newModel); defer result3.deinit(); // Will clear some values in newModel. // Checking that the model has been updated (but only the right field). @@ -306,14 +325,14 @@ test "model create, save and find" { // Try to find the created then saved model, to check that everything has been saved correctly. - var result4 = try MyModelRepository.find(std.testing.allocator, database, newModel.id); + var result4 = try MyModelRepository.find(std.testing.allocator, poolConnector.connector(), newModel.id); defer result4.deinit(); // Will clear some values in newModel. try std.testing.expectEqualDeep(newModel, result4.first().?.*); // Try to find multiple models at once. - var result5 = try MyModelRepository.find(std.testing.allocator, database, &[_]i32{1, newModel.id}); + var result5 = try MyModelRepository.find(std.testing.allocator, poolConnector.connector(), &[_]i32{1, newModel.id}); defer result5.deinit(); try std.testing.expectEqual(2, result5.models.len);