1
0
mirror of https://github.com/tensorchord/pgvecto.rs.git synced 2025-07-29 08:21:12 +03:00

feat: add Python bindings by psycopg 3 (#102)

* feat: support psycopg

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* chore: lint && add comment

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* test: update tests

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

test: update tests

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* test: fix test of psycopg

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* chore: update readme

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* chore: write examples && modify readme

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* chore: bump version no.

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

* feat: use normal defined class for Dumper

Signed-off-by: 盐粒 Yanli <mail@yanli.one>

---------

Signed-off-by: 盐粒 Yanli <mail@yanli.one>
This commit is contained in:
盐粒 Yanli
2023-10-31 16:35:55 +08:00
committed by GitHub
parent 273f79244e
commit b6472ae747
8 changed files with 316 additions and 98 deletions

View File

@ -13,6 +13,7 @@ pip install pgvecto_rs
See the usage examples:
- [SQLAlchemy](#SQLAlchemy)
- [psycopg3](#psycopg3)
### SQLAlchemy
@ -21,77 +22,25 @@ Install [SQLAlchemy](https://github.com/sqlalchemy/sqlalchemy) and [psycopg3](ht
pip install "psycopg[binary]" sqlalchemy
```
Then write your code. For example:
```python
import numpy as np
from sqlalchemy import create_engine, select, insert, types
from sqlalchemy import Integer, String
from pgvector_rs.sqlalchemy import Vector
from sqlalchemy.orm import Session, DeclarativeBase, mapped_column, Mapped
URL = "postgresql+psycopg://<...>"
# Define the ORM model
class Base(DeclarativeBase):
pass
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
text: Mapped[str] = mapped_column(String)
embedding: Mapped[np.ndarray] = mapped_column(Vector(3))
def __repr__(self) -> str:
return f"{self.text}: {self.embedding}"
# Connect to the DB and create the table
engine = create_engine(URL)
Document.metadata.create_all(engine)
with Session(engine) as session:
# Insert 3 rows into the table
t1 = insert(Document).values(text="hello world", embedding=[1, 2, 3])
t2 = insert(Document).values(text="hello postgres", embedding=[1, 2, 4])
t3 = insert(Document).values(text="hello pgvecto.rs", embedding=[1, 3, 4])
for t in [t1, t2, t3]:
session.execute(t)
session.commit()
# Select the row "hello pgvecto.rs"
stmt = select(Document).where(Document.text == "hello pgvecto.rs")
target = session.scalar(stmt)
# Select all the rows and sort them
# by the squared_euclidean_distance to "hello pgvecto.rs"
stmt = select(
Document.text,
Document.embedding.squared_euclidean_distance(target.embedding).label(
"distance"
),
).order_by("distance")
for doc in session.execute(stmt):
print(doc)
# Drop the table
Document.metadata.drop_all(engine)
```
The output will be:
```
('hello pgvecto.rs', 0.0)
('hello postgres', 1.0)
('hello world', 2.0)
```
Then write your code. See [examples/sqlalchemy_example.py](examples/sqlalchemy_example.py) and [tests/test_sqlalchemy.py](tests/test_sqlalchemy.py) for example.
All the operators include:
- `squared_euclidean_distance`
- `negative_dot_product_distance`
- `negative_cosine_distance`
### psycopg3
Install [psycopg3](https://www.psycopg.org/psycopg3/docs/basic/install.html)
```bash
pip install "psycopg[binary]"
```
Then write your code. See [examples/psycopg_example.py](examples/psycopg_example.py) and [tests/test_psycopg.py](tests/test_psycopg.py) for example.
Known issue:
- Can not check the length of an vector when inserting it into a table. See: [#96](https://github.com/tensorchord/pgvecto.rs/issues/96).
## Development
This package is managed by [PDM](https://pdm.fming.dev).

View File

@ -0,0 +1,61 @@
import os
import psycopg
import numpy as np
from pgvecto_rs.psycopg import register_vector
URL = "postgresql://{username}:{password}@{host}:{port}/{db_name}".format(
port=os.getenv("DB_PORT", 5432),
host=os.getenv("DB_HOST", "localhost"),
username=os.getenv("DB_USER", "postgres"),
password=os.getenv("DB_PASS", "mysecretpassword"),
db_name=os.getenv("DB_NAME", "postgres"),
)
# Connect to the DB and init things
with psycopg.connect(URL) as conn:
conn.execute("CREATE EXTENSION IF NOT EXISTS vectors;")
register_vector(conn)
conn.execute(
"CREATE TABLE documents (id SERIAL PRIMARY KEY, text TEXT NOT NULL, embedding vector(3) NOT NULL);"
)
conn.commit()
try:
# Insert 3 rows into the table
conn.execute(
"INSERT INTO documents (text, embedding) VALUES (%s, %s);",
("hello world", [1, 2, 3]),
)
conn.execute(
"INSERT INTO documents (text, embedding) VALUES (%s, %s);",
("hello postgres", [1.0, 2.0, 4.0]),
)
conn.execute(
"INSERT INTO documents (text, embedding) VALUES (%s, %s);",
("hello pgvecto.rs", np.array([1, 3, 4])),
)
conn.commit()
# Select the row "hello pgvecto.rs"
cur = conn.execute(
"SELECT * FROM documents WHERE text = %s;", ("hello pgvecto.rs",)
)
target = cur.fetchone()[2]
# Select all the rows and sort them
# by the squared_euclidean_distance to "hello pgvecto.rs"
cur = conn.execute(
"SELECT text, embedding <-> %s AS distance FROM documents ORDER BY distance;",
(target,),
)
for row in cur.fetchall():
print(row)
# The output will be:
# ```
# ('hello pgvecto.rs', 0.0)
# ('hello postgres', 1.0)
# ('hello world', 2.0)
# ```
finally:
# Drop the table
conn.execute("DROP TABLE IF EXISTS documents;")
conn.commit()

View File

@ -0,0 +1,69 @@
import os
import numpy as np
from sqlalchemy import create_engine, select, insert
from sqlalchemy import Integer, String
from pgvecto_rs.sqlalchemy import Vector
from sqlalchemy.orm import Session, DeclarativeBase, mapped_column, Mapped
URL = "postgresql+psycopg://{username}:{password}@{host}:{port}/{db_name}".format(
port=os.getenv("DB_PORT", 5432),
host=os.getenv("DB_HOST", "localhost"),
username=os.getenv("DB_USER", "postgres"),
password=os.getenv("DB_PASS", "mysecretpassword"),
db_name=os.getenv("DB_NAME", "postgres"),
)
# Define the ORM model
class Base(DeclarativeBase):
pass
class Document(Base):
__tablename__ = "documents"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
text: Mapped[str] = mapped_column(String)
embedding: Mapped[np.ndarray] = mapped_column(Vector(3))
def __repr__(self) -> str:
return f"{self.text}: {self.embedding}"
# Connect to the DB and create the table
engine = create_engine(URL)
Document.metadata.create_all(engine)
with Session(engine) as session:
# Insert 3 rows into the table
t1 = insert(Document).values(text="hello world", embedding=[1, 2, 3])
t2 = insert(Document).values(text="hello postgres", embedding=[1.0, 2.0, 4.0])
t3 = insert(Document).values(text="hello pgvecto.rs", embedding=np.array([1, 3, 4]))
for t in [t1, t2, t3]:
session.execute(t)
session.commit()
# Select the row "hello pgvecto.rs"
stmt = select(Document).where(Document.text == "hello pgvecto.rs")
target = session.scalar(stmt)
# Select all the rows and sort them
# by the squared_euclidean_distance to "hello pgvecto.rs"
stmt = select(
Document.text,
Document.embedding.squared_euclidean_distance(target.embedding).label(
"distance"
),
).order_by("distance")
for doc in session.execute(stmt):
print(doc)
# The output will be:
# ```
# ('hello pgvecto.rs', 0.0)
# ('hello postgres', 1.0)
# ('hello world', 2.0)
# ```
# Drop the table
Document.metadata.drop_all(engine)

View File

@ -1,6 +1,6 @@
[project]
name = "pgvecto_rs"
version = "0.1.1"
version = "0.1.2"
description = "Python binding for pgvecto.rs"
authors = [
{name = "TensorChord", email = "envd-maintainers@tensorchord.ai"},

View File

@ -0,0 +1,48 @@
from psycopg import Connection, ProgrammingError
from psycopg.adapt import Loader, Dumper
from psycopg.pq import Format
from psycopg.types import TypeInfo
from numpy import ndarray
from ..utils.serializer import from_db_str, to_db_str
__all__ = ["register_vector"]
class VectorDumper(Dumper):
format = Format.TEXT
def dump(self, obj):
return to_db_str(obj).encode("utf8")
class VectorLoader(Loader):
format = Format.TEXT
def load(self, data):
if isinstance(data, memoryview):
data = bytes(data)
return from_db_str(data.decode("utf8"))
def register_vector(context: Connection):
info = TypeInfo.fetch(context, "vector")
register_vector_info(context, info)
async def register_vector_async(context: Connection):
info = await TypeInfo.fetch(context, "vector")
register_vector_info(context, info)
def register_vector_info(context: Connection, info: TypeInfo):
if info is None:
raise ProgrammingError("vector type not found in the database")
info.register(context)
class VectorTextDumper(VectorDumper):
oid = info.oid
adapters = context.adapters
adapters.register_dumper(list, VectorTextDumper)
adapters.register_dumper(ndarray, VectorTextDumper)
adapters.register_loader(info.oid, VectorLoader)

View File

@ -22,38 +22,18 @@ URL = "postgresql://{username}:{password}@{host}:{port}/{db_name}".format(
# ==== test_create_index ====
TOML_SETTINGS = {
"flat": "$${}$$".format(
toml.dumps(
{
"capacity": 2097152,
"algorithm": {"flat": {}},
}
)
"flat": toml.dumps(
{
"capacity": 2097152,
"algorithm": {"flat": {}},
}
),
"hnsw": "$${}$$".format(
toml.dumps(
{
"capacity": 2097152,
"algorithm": {"hnsw": {}},
}
)
"hnsw": toml.dumps(
{
"capacity": 2097152,
"algorithm": {"hnsw": {}},
}
),
# "ivf": "$${}$$".format(
# toml.dumps(
# {
# "capacity": 2097152,
# "algorithm": {"ivf": {}},
# }
# )
# ),
# "vamana": "$${}$$".format(
# toml.dumps(
# {
# "capacity": 2097152,
# "algorithm": {"vamana": {}},
# }
# )
# ),
}
# ==== test_invalid_insert ====

View File

@ -0,0 +1,111 @@
import pytest
import psycopg
import numpy as np
from psycopg import Connection, sql
from pgvecto_rs.psycopg import register_vector
from tests import (
URL,
TOML_SETTINGS,
VECTORS,
OP_SQRT_EUCLID_DIS,
OP_NEG_DOT_PROD_DIS,
OP_NEG_COS_DIS,
EXPECTED_SQRT_EUCLID_DIS,
EXPECTED_NEG_DOT_PROD_DIS,
EXPECTED_NEG_COS_DIS,
LEN_AFT_DEL,
)
@pytest.fixture(scope="module")
def conn():
with psycopg.connect(URL) as conn:
conn.execute("CREATE EXTENSION IF NOT EXISTS vectors;")
register_vector(conn)
conn.execute("DROP TABLE IF EXISTS tb_test_item;")
conn.execute(
"CREATE TABLE tb_test_item (id bigserial PRIMARY KEY, embedding vector(3) NOT NULL);"
)
conn.commit()
try:
yield conn
finally:
conn.execute("DROP TABLE IF EXISTS tb_test_item;")
conn.commit()
@pytest.mark.parametrize("index_name,index_setting", TOML_SETTINGS.items())
def test_create_index(conn: Connection, index_name: str, index_setting: str):
stat = sql.SQL(
"CREATE INDEX {} ON tb_test_item USING vectors (embedding l2_ops) WITH (options={});",
).format(sql.Identifier(index_name), index_setting)
conn.execute(stat)
conn.commit()
# The server cannot handle invalid vectors curently, see https://github.com/tensorchord/pgvecto.rs/issues/96
# def test_invalid_insert(conn: Connection):
# for i, e in enumerate(INVALID_VECTORS):
# try:
# conn.execute("INSERT INTO tb_test_item (embedding) VALUES (%s);", (e, ) )
# pass
# except:
# conn.rollback()
# else:
# conn.rollback()
# raise AssertionError(
# 'failed to raise invalid value error for {}th vector {}'
# .format(i, e),
# )
# =================================
# Semetic search tests
# =================================
def test_insert(conn: Connection):
with conn.cursor() as cur:
cur.executemany(
"INSERT INTO tb_test_item (embedding) VALUES (%s);", [(e,) for e in VECTORS]
)
cur.execute("SELECT * FROM tb_test_item;")
conn.commit()
rows = cur.fetchall()
assert len(rows) == len(VECTORS)
for i, e in enumerate(rows):
assert np.allclose(e[1], VECTORS[i], atol=1e-10)
def test_squared_euclidean_distance(conn: Connection):
cur = conn.execute(
"SELECT embedding <-> %s FROM tb_test_item;", (OP_SQRT_EUCLID_DIS,)
)
for i, row in enumerate(cur.fetchall()):
assert np.allclose(EXPECTED_SQRT_EUCLID_DIS[i], row[0], atol=1e-10)
def test_negative_dot_product_distance(conn: Connection):
cur = conn.execute(
"SELECT embedding <#> %s FROM tb_test_item;", (OP_NEG_DOT_PROD_DIS,)
)
for i, row in enumerate(cur.fetchall()):
assert np.allclose(EXPECTED_NEG_DOT_PROD_DIS[i], row[0], atol=1e-10)
def test_negative_cosine_distance(conn: Connection):
cur = conn.execute("SELECT embedding <=> %s FROM tb_test_item;", (OP_NEG_COS_DIS,))
for i, row in enumerate(cur.fetchall()):
assert np.allclose(EXPECTED_NEG_COS_DIS[i], row[0], atol=1e-10)
# # =================================
# # Suffix functional tests
# # =================================
def test_delete(conn: Connection):
conn.execute("DELETE FROM tb_test_item WHERE embedding = %s;", (VECTORS[0],))
conn.commit()
cur = conn.execute("SELECT * FROM tb_test_item;")
assert len(cur.fetchall()) == LEN_AFT_DEL

View File

@ -69,11 +69,11 @@ def test_create_index(session: Session, index_name: str, index_setting: str):
index_name,
Document.embedding,
postgresql_using="vectors",
postgresql_with={"options": index_setting},
postgresql_with={"options": f"$${index_setting}$$"},
postgresql_ops={"embedding": "l2_ops"},
)
index.create(session.bind)
session.rollback()
session.commit()
@pytest.mark.parametrize("i,e", enumerate(INVALID_VECTORS))