new connection pool and query check stuff

This commit is contained in:
Adam D. Ruppe 2025-11-03 19:24:52 -05:00
parent bfc0014ae2
commit 8da8d51a86
5 changed files with 639 additions and 2 deletions

View File

@ -109,6 +109,8 @@ interface Database {
import arsd.core;
Variant[] vargs;
string sql;
// FIXME: use the new helper functions sqlFromInterpolatedArgs and variantsFromInterpolatedArgs
foreach(arg; args) {
static if(is(typeof(arg) == InterpolatedLiteral!str, string str)) {
sql ~= str;
@ -123,10 +125,19 @@ interface Database {
}
return queryImpl(sql, vargs);
}
// see test/dbis.d
/// turns a systime into a value understandable by the target database as a timestamp to be concated into a query. so it should be quoted and escaped etc as necessary
string sysTimeToValue(SysTime);
/++
Return true if the connection appears to be alive
History:
Added October 30, 2025
+/
bool isAlive();
/// Prepared statement api
/*
PreparedStatement prepareStatement(string sql, int numberOfArguments);
@ -349,6 +360,519 @@ interface ResultSet {
/* deprecated */ final ResultSet byAssoc() { return this; }
}
/++
Converts a database result set to a html table, using [arsd.dom].
History:
Added October 29, 2025
+/
auto resultSetToHtmlTable()(ResultSet resultSet) {
import arsd.dom;
Table table = cast(Table) Element.make("table");
table.appendHeaderRow(resultSet.fieldNames);
foreach(row; resultSet) {
table.appendRow(row.toStringArray());
}
return table;
}
abstract class ConnectionPoolBase : arsd.core.SynchronizableObject {
protected struct DatabaseListItem {
Database db;
DatabaseListItem* nextAvailable;
}
private DatabaseListItem* firstAvailable;
// FIXME: add a connection count limit and some kind of wait mechanism for one to become available
final protected void makeAvailable(DatabaseListItem* what) {
synchronized(this) {
auto keep = this.firstAvailable;
what.nextAvailable = keep;
this.firstAvailable = what;
}
}
final protected DatabaseListItem* getNext() {
DatabaseListItem* toUse;
synchronized(this) {
if(this.firstAvailable !is null) {
toUse = this.firstAvailable;
this.firstAvailable = this.firstAvailable.nextAvailable;
}
}
return toUse;
}
}
/++
PooledConnection is an RAII holder for a database connection that is automatically recycled to the pool it came from (unless you [discard] it).
History:
Added October 29, 2025
+/
struct PooledConnection(ConnectionPoolType) {
private ConnectionPoolType.DatabaseListItem* dli;
private ConnectionPoolType pool;
private bool discarded;
private this(ConnectionPoolType.DatabaseListItem* dli, ConnectionPoolType pool) {
this.dli = dli;
this.pool = pool;
}
@disable this(this);
/++
Indicates you want the connection discarded instead of returned to the pool when you're finished with it.
You should call this if you know the connection is dead.
+/
void discard() {
this.discarded = true;
}
~this() {
import core.memory;
if(GC.inFinalizer)
return;
if(!discarded && dli.db.isAlive) {
// FIXME: a connection must not be returned to the pool unless it is both alive and idle; any pending query work would screw up the next user
// it is the user's responsibility to live with other state though like prepared statements or whatever saved per-connection.
pool.makeAvailable(dli);
}
}
/++
+/
ConnectionPoolType.DriverType borrow() @system return {
return cast(ConnectionPoolType.DriverType) dli.db; // static_cast
}
/++
+/
ResultSet rtQuery(T...)(T t) {
return dli.db.query(t);
}
/++
+/
template query(string file = __FILE__, size_t line = __LINE__, Args...) {
enum asSql = sqlFromInterpolatedArgs!(Args);
__gshared queryMetadata = new QueryMetadata!(asSql, file, line);
@(arsd.core.standalone) @system shared static this() {
ConnectionPoolType.registeredQueries_ ~= queryMetadata;
}
auto query(arsd.core.InterpolationHeader ihead, Args args, arsd.core.InterpolationFooter ifoot) {
return new QueryResult!queryMetadata(dli.db.queryImpl(asSql, variantsFromInterpolatedArgs(args)));
}
}
}
/++
+/
unittest {
import arsd.database;
shared dbPool = new shared ConnectionPool!(() => new MockDatabase())();
void main() {
auto db = dbPool.get();
foreach(row; db.query(i"SELECT * FROM test")) {
if(row.id.isNull)
continue;
auto id = row.id.get!int;
}
}
main(); // remove from docs
}
private Variant[] variantsFromInterpolatedArgs(Args...)(Args args) {
Variant[] ret;
import arsd.core;
foreach(arg; args) {
static if(is(typeof(arg) == InterpolationHeader))
{}
else
static if(is(typeof(arg) == InterpolationFooter))
{}
else
static if(is(typeof(arg) == InterpolatedLiteral!sql, string sql))
{}
else
static if(is(typeof(arg) == InterpolatedExpression!code, string code))
{}
else
static if(is(typeof(arg) == AdHocBuiltStruct!(tag, names, Values), string tag, string[] names, Values...)) {
static if(tag == "VALUES") {
foreach(value; arg.values) {
static if(is(value == sql_!code, string code)) {
// intentionally blank
} else {
ret ~= Variant(value);
}
}
} else static assert(0);
}
// FIXME: iraw and sql!"" too and VALUES
else
ret ~= Variant(arg);
}
return ret;
}
private string sqlFromInterpolatedArgs(Args...)() {
string ret;
import arsd.core;
foreach(arg; Args) {
static if(is(arg == InterpolationHeader))
{}
else
static if(is(arg == InterpolationFooter))
{}
else
static if(is(arg == InterpolatedLiteral!sql, string sql))
ret ~= sql;
else
static if(is(arg == InterpolatedExpression!code, string code))
{}
else
static if(is(arg == AdHocBuiltStruct!(tag, names, values), string tag, string[] names, values...)) {
static if(tag == "VALUES") {
ret ~= "(";
foreach(idx, name; names) {
if(idx) ret ~= ", ";
ret ~= name;
}
ret ~= ") VALUES (";
foreach(idx, value; values) {
if(idx) ret ~= ", ";
static if(is(value == sql_!code, string code)) {
ret ~= code;
} else {
ret ~= "?";
}
}
ret ~= ")";
}
else static assert(0);
}
// FIXME: iraw and sql_!"" too
else
ret ~= "?";
}
return ret;
}
/+
+/
struct AssociatedDatabaseDatum(alias queryMetadata, string name, string file, size_t line) {
@(arsd.core.standalone) @system shared static this() {
queryMetadata.registerName(name, file, line);
}
template get(T, string file = __FILE__, size_t line = __LINE__) {
shared static this() {
// FIXME: empty string and null must be distinguishable in arsd.core
static if(is(T == string))
T t = "sample";
else
T t = T.init;
queryMetadata.registerType(name, LimitedVariant(t), T.stringof, file, line);
}
T get() {
import std.conv;
return datum.toString().to!T;
}
}
DatabaseDatum datum;
bool isNull() {
return datum.isNull();
}
string toString() {
if(isNull)
return null;
else
return datum.toString();
}
alias toString this;
}
private abstract class QueryResultBase {
}
class QueryResult(alias queryMetadata) : QueryResultBase {
private ResultSet resultSet;
this(ResultSet resultSet) {
this.resultSet = resultSet;
}
QueryResultRow!queryMetadata front() {
return new QueryResultRow!queryMetadata(resultSet.front);
}
bool empty() {
return resultSet.empty;
}
void popFront() {
resultSet.popFront();
}
}
class QueryResultRow(alias queryMetadata) {
Row row;
this(Row row) {
this.row = row;
}
AssociatedDatabaseDatum!(queryMetadata, name, file, line) opDispatch(string name, string file = __FILE__, size_t line = __LINE__)() if(name != "__dtor") {
return typeof(return)(row[name]);
}
// i could support an opSlice. maybe opIndex tho it won't be CT bound checked w/o a ct!0 thing
// also want opApply which discards type check prolly and just gives you the datum.
int opApply(int delegate(string, DatabaseDatum) dg) {
string[] fn = row.resultSet.fieldNames();
foreach(idx, item; row.row)
mixin(yield("fn[idx], item"));
return 0;
}
/// ditto
int opApply(int delegate(DatabaseDatum) dg) {
foreach(item; row.row)
mixin(yield("item"));
return 0;
}
}
struct ReferencedColumn {
string name;
LimitedVariant sampleData;
string assumedType;
string actualType;
string file;
size_t line;
}
class QueryMetadataBase {
ReferencedColumn[] names;
void registerName(string name, string file, size_t line) {
names ~= ReferencedColumn(name, LimitedVariant.init, null, null, file, line);
}
void registerType(string name, LimitedVariant sample, string type, string file, size_t line) {
foreach(ref n; names)
if(n.name == name) {
if(n.assumedType.length && type.length) {
n.actualType = type;
}
n.assumedType = type;
n.sampleData = sample;
n.file = file;
n.line = line;
return;
}
names ~= ReferencedColumn(name, sample, type, type, file, line);
}
abstract string sql() const;
abstract string file() const;
abstract size_t line() const;
}
class QueryMetadata(string q, string file_, size_t line_) : QueryMetadataBase {
override string sql() const { return q; }
override string file() const { return file_; }
override size_t line() const { return line_; }
}
version(unittest)
class MockDatabase : Database {
void startTransaction() {}
string sysTimeToValue(SysTime s) { return null; }
bool isAlive() { return true; }
ResultSet queryImpl(string sql, Variant[] args...) {
return new PredefinedResultSet(null, null);
}
string escape(string sqlData) {
return null;
}
string escapeBinaryString(const(ubyte)[] sqlData) {
return null;
}
}
/++
Helpers for interpolated queries.
History:
Added October 31, 2025
See_Also:
[arsd.core.iraw]
+/
auto VALUES() {
import arsd.core;
return AdHocBuiltStruct!"VALUES"();
}
/// ditto
auto sql(string s)() {
return sql_!s();
}
private struct sql_(string s) { }
/++
A ConnectionPool manages a set of shared connections to a database.
Create one like this:
---
// at top level
shared dbPool = new shared ConnectionPool!(() => new PostgreSql("dbname=me"))();
void main() {
auto db = dbPool.get(); // in the function, get it and use it temporarily
}
---
History:
Added October 29, 2025
+/
class ConnectionPool(alias connectionFactory) : ConnectionPoolBase {
private alias unsharedThis = ConnectionPool!connectionFactory;
static if(is(typeof(connectionFactory) DriverType == return)) {
static if(!is(DriverType : Database))
static assert(0, "unusable connectionFactory - it needs to return an instance of Database");
} else {
static assert(0, "unusable connectionFactory - it needs to be a function");
}
private __gshared QueryMetadataBase[] registeredQueries_;
immutable(QueryMetadataBase[]) registeredQueries() shared {
return cast(immutable(QueryMetadataBase[])) registeredQueries_;
}
bool checkQueries()(DriverType db) shared {
bool succeeded = true;
import arsd.postgres; // FIXME is this really postgres only? looks like sqlite has no similar function... maybe make a view then sqlite3_table_column_metadata ?
static assert(is(DriverType == PostgreSql), "Only implemented for postgres right now");
int count;
import arsd.core;
import arsd.conv;
foreach(q; registeredQueries) {
//writeln(q.file, ":", q.line, " ", q.sql);
try {
try {
string dbSpecificSql;
int placeholderNumber = 1;
size_t lastCopied = 0;
foreach(idx, ch; q.sql) {
if(ch == '?') {
dbSpecificSql ~= q.sql[lastCopied .. idx];
lastCopied = idx + 1;
dbSpecificSql ~= "$" ~ to!string(placeholderNumber);
placeholderNumber++;
}
}
dbSpecificSql ~= q.sql[lastCopied .. $];
// FIXME: pipeline this
db.query("PREPARE thing_"~to!string(++count)~" AS " ~ dbSpecificSql);
} catch(Exception e) {
e.file = q.file;
e.line = q.line;
throw e;
// continue;
}
// this mysql function looks about right: https://dev.mysql.com/doc/c-api/8.0/en/mysql-stmt-result-metadata.html
// could maybe emulate by trying it in a rolled back transaction though.
auto desca = describePrepared(db,"thing_"~arsd.conv.to!string(count));
LimitedVariant[string] byName;
foreach(col; desca.result) {
byName[col.fieldName] = col.type.storage;
}
foreach(name; q.names) {
if(name.name !in byName)
throw ArsdException!"you reference unknown field"(name.name, name.file, name.line);
if(name.assumedType.length == 0)
continue;
if(byName[name.name].contains != name.sampleData.contains)
throw ArsdException!"type mismatch"(
name.name,
arsd.conv.to!string(byName[name.name].contains),
arsd.conv.to!string(name.sampleData.contains),
name.file,
name.line,
);
// i think this is redundant
if(name.assumedType.length && name.actualType.length && name.actualType != name.assumedType) {
throw ArsdException!"usage mismatch"(name.assumedType, name.actualType, name.file, name.line);
}
}
} catch(Exception e) {
writeln(e.toString());
succeeded = false;
}
}
if(!succeeded)
writeln("db check failed.");
return succeeded;
}
/++
+/
public PooledConnection!(unsharedThis) get() shared {
auto toUse = (cast(unsharedThis) this).getNext();
if(toUse is null)
toUse = new DatabaseListItem(connectionFactory());
return PooledConnection!(unsharedThis)(toUse, cast(unsharedThis) this);
}
}
class DatabaseException : Exception {
this(string msg, string file = __FILE__, size_t line = __LINE__) {
super(msg, file, line);

View File

@ -88,6 +88,10 @@ class MsSql : Database {
return null; // FIXME
}
override bool isAlive() {
return true;
}
private:
SQLHENV env;
SQLHDBC conn;

View File

@ -219,6 +219,9 @@ class MySql : Database {
query("START TRANSACTION");
}
override bool isAlive() {
return true;
}
string sysTimeToValue(SysTime s) {
return "cast('" ~ escape(s.toISOExtString()) ~ "' as datetime)";

View File

@ -60,9 +60,12 @@ class PostgreSql : Database {
conn = PQconnectdb(toStringz(connectionString));
if(conn is null)
throw new DatabaseException("Unable to allocate PG connection object");
if(PQstatus(conn) != CONNECTION_OK)
if(PQstatus(conn) != CONNECTION_OK) {
this.connectionOk = false;
throw new DatabaseException(error());
}
query("SET NAMES 'utf8'"); // D does everything with utf8
this.connectionOk = true;
}
string connectionString;
@ -75,6 +78,11 @@ class PostgreSql : Database {
return "'" ~ escape(s.toISOExtString()) ~ "'::timestamptz";
}
private bool connectionOk;
override bool isAlive() {
return connectionOk;
}
/**
Prepared statement support
@ -132,8 +140,10 @@ class PostgreSql : Database {
conn = PQconnectdb(toStringz(connectionString));
if(conn is null)
throw new DatabaseException("Unable to allocate PG connection object");
if(PQstatus(conn) != CONNECTION_OK)
if(PQstatus(conn) != CONNECTION_OK) {
this.connectionOk = false;
throw new DatabaseException(error());
}
goto retry;
}
throw new DatabaseException(error());
@ -179,6 +189,82 @@ class PostgreSql : Database {
PGconn* conn;
}
/+
# when it changes from lowercase to upper case, call that a new word. or when it goes to/from anything else and underscore or dashes.
+/
struct PreparedStatementDescription {
PreparedStatementResult[] result;
}
struct PreparedStatementResult {
string fieldName;
DatabaseDatum type;
}
PreparedStatementDescription describePrepared(PostgreSql db, string name) {
auto res = PQdescribePrepared(db.conn, name.toStringz);
PreparedStatementResult[] ret;
// PQnparams PQparamtype for params
auto numFields = PQnfields(res);
foreach(num; 0 .. numFields) {
auto typeId = PQftype(res, num);
DatabaseDatum dd;
dd.platformSpecificTag = typeId;
dd.storage = sampleForOid(typeId);
ret ~= PreparedStatementResult(
copyCString(PQfname(res, num)),
dd,
);
}
PQclear(res);
return PreparedStatementDescription(ret);
}
import arsd.core : LimitedVariant, PackedDateTime, SimplifiedUtcTimestamp;
LimitedVariant sampleForOid(int platformSpecificTag) {
switch(platformSpecificTag) {
case BOOLOID:
return LimitedVariant(false);
case BYTEAOID:
return LimitedVariant(cast(const(ubyte)[]) null);
case TEXTOID:
case VARCHAROID:
return LimitedVariant("");
case INT4OID:
return LimitedVariant(0);
case INT8OID:
return LimitedVariant(0L);
case FLOAT4OID:
return LimitedVariant(0.0f);
case FLOAT8OID:
return LimitedVariant(0.0);
case TIMESTAMPOID:
case TIMESTAMPTZOID:
return LimitedVariant(SimplifiedUtcTimestamp(0));
case DATEOID:
PackedDateTime d;
d.hasDate = true;
return LimitedVariant(d); // might want a different type so contains shows the thing without checking hasDate and hasTime
case TIMETZOID: // possibly wrong... the tz isn't in my packed thing
case TIMEOID:
PackedDateTime d;
d.hasTime = true;
return LimitedVariant(d);
case INTERVALOID:
// months, days, and microseconds
case NUMERICOID: // aka decimal
default:
// when in doubt, assume it is just a string
return LimitedVariant("sample");
}
}
private string toLowerFast(string s) {
import std.ascii : isUpper;
foreach (c; s)
@ -374,7 +460,22 @@ extern(C) {
int PQfformat(const PGresult *res, int column_number);
alias Oid = int;
enum BOOLOID = 16;
enum BYTEAOID = 17;
enum TEXTOID = 25;
enum INT4OID = 23; // integer
enum INT8OID = 20; // bigint
enum NUMERICOID = 1700;
enum FLOAT4OID = 700;
enum FLOAT8OID = 701;
enum VARCHAROID = 1043;
enum DATEOID = 1082;
enum TIMEOID = 1083;
enum TIMESTAMPOID = 1114;
enum TIMESTAMPTZOID = 1184;
enum INTERVALOID = 1186;
enum TIMETZOID = 1266;
Oid PQftype(const PGresult* res, int column_number);
char *PQescapeByteaConn(PGconn *conn,
@ -386,6 +487,7 @@ extern(C) {
char* PQcmdTuples(PGresult *res);
PGresult *PQdescribePrepared(PGconn *conn, const char *stmtName);
}
/*

View File

@ -124,6 +124,10 @@ class Sqlite : Database {
}
}
override bool isAlive() {
return true;
}
///
override void startTransaction() {
query("BEGIN TRANSACTION");