You've already forked authentication-service
mirror of
https://github.com/matrix-org/matrix-authentication-service.git
synced 2025-11-20 12:02:22 +03:00
Upgrade all Rust dependencies
This includes breaking changes of sqlx 0.7.0
This commit is contained in:
@@ -205,26 +205,29 @@ where
|
||||
.bind(id.to_string())
|
||||
.bind(job_type)
|
||||
.bind(on)
|
||||
.execute(&mut conn)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(id)
|
||||
}
|
||||
|
||||
async fn fetch_by_id(&self, job_id: &JobId) -> StorageResult<Option<JobRequest<Self::Output>>> {
|
||||
let pool = self.pool.clone();
|
||||
let mut conn = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(|e| StorageError::Connection(Box::from(e)))?;
|
||||
|
||||
let fetch_query = "SELECT * FROM apalis.jobs WHERE id = $1";
|
||||
let res: Option<SqlJobRequest<T>> = sqlx::query_as(fetch_query)
|
||||
.bind(job_id.to_string())
|
||||
.fetch_optional(&pool)
|
||||
.fetch_optional(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(res.map(Into::into))
|
||||
}
|
||||
|
||||
async fn heartbeat(&mut self, pulse: StorageWorkerPulse) -> StorageResult<bool> {
|
||||
let pool = self.pool.clone();
|
||||
match pulse {
|
||||
StorageWorkerPulse::EnqueueScheduled { count: _ } => {
|
||||
// Ideally jobs are queue via run_at. So this is not necessary
|
||||
@@ -234,7 +237,8 @@ where
|
||||
// Worker not seen in 5 minutes yet has running jobs
|
||||
StorageWorkerPulse::ReenqueueOrphaned { count } => {
|
||||
let job_type = T::NAME;
|
||||
let mut tx = pool
|
||||
let mut conn = self
|
||||
.pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
@@ -247,7 +251,7 @@ where
|
||||
sqlx::query(query)
|
||||
.bind(job_type)
|
||||
.bind(count)
|
||||
.execute(&mut tx)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(true)
|
||||
@@ -260,16 +264,16 @@ where
|
||||
async fn kill(&mut self, worker_id: &WorkerId, job_id: &JobId) -> StorageResult<()> {
|
||||
let pool = self.pool.clone();
|
||||
|
||||
let mut tx = pool
|
||||
let mut conn = pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
.map_err(|e| StorageError::Connection(Box::from(e)))?;
|
||||
let query =
|
||||
"UPDATE apalis.jobs SET status = 'Killed', done_at = now() WHERE id = $1 AND lock_by = $2";
|
||||
sqlx::query(query)
|
||||
.bind(job_id.to_string())
|
||||
.bind(worker_id.to_string())
|
||||
.execute(&mut tx)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(())
|
||||
@@ -280,16 +284,16 @@ where
|
||||
async fn retry(&mut self, worker_id: &WorkerId, job_id: &JobId) -> StorageResult<()> {
|
||||
let pool = self.pool.clone();
|
||||
|
||||
let mut tx = pool
|
||||
let mut conn = pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
.map_err(|e| StorageError::Connection(Box::from(e)))?;
|
||||
let query =
|
||||
"UPDATE apalis.jobs SET status = 'Pending', done_at = NULL, lock_by = NULL WHERE id = $1 AND lock_by = $2";
|
||||
sqlx::query(query)
|
||||
.bind(job_id.to_string())
|
||||
.bind(worker_id.to_string())
|
||||
.execute(&mut tx)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(())
|
||||
@@ -343,16 +347,16 @@ where
|
||||
#[allow(clippy::disallowed_methods)]
|
||||
let run_at = Utc::now().add(wait);
|
||||
|
||||
let mut tx = pool
|
||||
let mut conn = pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
.map_err(|e| StorageError::Connection(Box::from(e)))?;
|
||||
let query =
|
||||
"UPDATE apalis.jobs SET status = 'Pending', done_at = NULL, lock_by = NULL, lock_at = NULL, run_at = $2 WHERE id = $1";
|
||||
sqlx::query(query)
|
||||
.bind(job_id.to_string())
|
||||
.bind(run_at)
|
||||
.execute(&mut tx)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(())
|
||||
@@ -371,10 +375,10 @@ where
|
||||
let lock_at = *job.lock_at();
|
||||
let last_error = job.last_error().clone();
|
||||
|
||||
let mut tx = pool
|
||||
let mut conn = pool
|
||||
.acquire()
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
.map_err(|e| StorageError::Connection(Box::from(e)))?;
|
||||
let query =
|
||||
"UPDATE apalis.jobs SET status = $1, attempts = $2, done_at = $3, lock_by = $4, lock_at = $5, last_error = $6 WHERE id = $7";
|
||||
sqlx::query(query)
|
||||
@@ -385,7 +389,7 @@ where
|
||||
.bind(lock_at)
|
||||
.bind(last_error)
|
||||
.bind(job_id.to_string())
|
||||
.execute(&mut tx)
|
||||
.execute(&mut *conn)
|
||||
.await
|
||||
.map_err(|e| StorageError::Database(Box::from(e)))?;
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user