CONTEXT I have setup a neo4j vector index in combination with cypher query to fetch specific nodes and and traverse through their paths to return specific data. Given this data will be passed to an LLM for summarisation, the output should be as clean as possible.
FUNCTION
def generate_employee_cypher_query(schema)
: fetches specific paths from entity Èmployee` by following the schema/ontology paths:
def generate_employee_cypher_query(schema):
employee_schema = schema["Employee"]
match_clauses = []
return_blocks = []
# Initial vector search
base_query = """
CALL db.index.vector.queryNodes($index_name, $n_results, $query_embedding)
YIELD node AS employee, score
WHERE employee:Employee AND employee.user_id = $user_id
MATCH (employee)
"""
# Generate MATCH clauses from schema relationships
for rel in employee_schema["relationships"]:
rel_type = rel["type"]
end_node = rel["endNode"]
cardinality = rel.get("cardinality", "0..n") # Default to 0..n if not specified
# Handle different cardinalities and relationship directions
if rel["startNode"] == "Employee":
# Outgoing relationship from Employee
match_clauses.append(
f"OPTIONAL MATCH (employee)-[{rel_type.lower()}Rel:{rel_type}]->"
f"({rel_type.lower()}:{end_node})"
)
# Special handling for relationships with TimeFrame
if end_node in ["Availability", "Unavailability"]:
match_clauses.append(
f"OPTIONAL MATCH ({rel_type.lower()})-[:HAS_TIMEFRAME]->"
f"({rel_type.lower()}TimeFrame:TimeFrame)"
)
else:
# Incoming relationship to Employee
match_clauses.append(
f"OPTIONAL MATCH ({rel_type.lower()}:{end_node})"
f"-[{rel_type.lower()}Rel:{rel_type}]->(employee)"
)
# Generate return blocks for each relationship
return_blocks.append("""
employee.name AS employeeName,
score,
apoc.convert.toJson(
CASE WHEN employee IS NOT NULL
THEN apoc.map.removeKeys(properties(employee),
['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
)
ELSE {}
END
) AS employeeJson,
""")
# Start connections array
return_blocks.append("apoc.convert.toJson([")
# Generate individual connection blocks
connection_blocks = []
for rel in employee_schema["relationships"]:
rel_type = rel["type"]
end_node = rel["endNode"]
cardinality = rel.get("cardinality", "0..n")
# Handle cardinality in return statement
is_single = cardinality in ["1..1", "0..1"]
collection_suffix = "[0]" if is_single else ""
if end_node in ["Availability", "Unavailability"]:
# Special handling for timeframe relationships
connection_blocks.append(f"""{{
type: '{rel_type}',
{rel_type.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL
THEN {{
employeeName: {rel_type.lower()}.employeeName,
timeframe: CASE WHEN {rel_type.lower()}TimeFrame IS NOT NULL
THEN {{
dateIndicator: {rel_type.lower()}TimeFrame.dateIndicator,
type: {rel_type.lower()}TimeFrame.type,
recurring: {rel_type.lower()}TimeFrame.recurring
}}
ELSE null
END
}}
ELSE null END){collection_suffix}
}}""")
else:
# Standard relationship handling
connection_blocks.append(f"""{{
type: '{rel_type}',
{end_node.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL
THEN apoc.map.removeKeys(properties({rel_type.lower()}),
['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
)
ELSE null END){collection_suffix}
}}""")
# Close connections array
return_blocks.append(",\n".join(connection_blocks))
return_blocks.append("]) AS connectionsJson")
# Combine all query parts
full_query = (
base_query +
"\n".join(match_clauses) +
"\nRETURN " +
"\n".join(return_blocks)
)
return full_query
Now, the output of this specific function can be seen in the example below:
Employee Vector Search Results:
[
{
"employeeName": "Emma Williams",
"score": 0.6321649551391602,
"employee": {
"name": "Emma Williams",
"email": "[email protected]"
},
"connections": [
{
"contract": {
"contractType": "Part-time"
},
"type": "HAS_CONTRACT_TYPE"
},
{
"has_unavailability": [],
"type": "HAS_UNAVAILABILITY"
},
{
"has_availability": [
{
"employeeName": "Emma Williams",
"timeframe": {
"recurring": true,
"dateIndicator": "Thu-Sat 16:00-00:00",
"type": "DayOfWeek"
}
}
],
"type": "HAS_AVAILABILITY"
},
{
"team": {
"name": "F&B"
},
"type": "BELONGS_TO"
},
{
"education": [],
"type": "HAS_EDUCATION"
},
{
"type": "HAS_CERTIFICATION",
"certification": [
{
"name": "Alcohol Service"
},
{
"name": "Mixology Certificate"
}
]
},
notice in the above output the empty strings in "education": [],
or "has_unavailability": [],
. I want those values to not be included in the output.
thank you in advance Manuel
I tried using CASE expressions or apoc.map.clean but did not seem to lead to the results i wanted.
CONTEXT I have setup a neo4j vector index in combination with cypher query to fetch specific nodes and and traverse through their paths to return specific data. Given this data will be passed to an LLM for summarisation, the output should be as clean as possible.
FUNCTION
def generate_employee_cypher_query(schema)
: fetches specific paths from entity Èmployee` by following the schema/ontology paths:
def generate_employee_cypher_query(schema):
employee_schema = schema["Employee"]
match_clauses = []
return_blocks = []
# Initial vector search
base_query = """
CALL db.index.vector.queryNodes($index_name, $n_results, $query_embedding)
YIELD node AS employee, score
WHERE employee:Employee AND employee.user_id = $user_id
MATCH (employee)
"""
# Generate MATCH clauses from schema relationships
for rel in employee_schema["relationships"]:
rel_type = rel["type"]
end_node = rel["endNode"]
cardinality = rel.get("cardinality", "0..n") # Default to 0..n if not specified
# Handle different cardinalities and relationship directions
if rel["startNode"] == "Employee":
# Outgoing relationship from Employee
match_clauses.append(
f"OPTIONAL MATCH (employee)-[{rel_type.lower()}Rel:{rel_type}]->"
f"({rel_type.lower()}:{end_node})"
)
# Special handling for relationships with TimeFrame
if end_node in ["Availability", "Unavailability"]:
match_clauses.append(
f"OPTIONAL MATCH ({rel_type.lower()})-[:HAS_TIMEFRAME]->"
f"({rel_type.lower()}TimeFrame:TimeFrame)"
)
else:
# Incoming relationship to Employee
match_clauses.append(
f"OPTIONAL MATCH ({rel_type.lower()}:{end_node})"
f"-[{rel_type.lower()}Rel:{rel_type}]->(employee)"
)
# Generate return blocks for each relationship
return_blocks.append("""
employee.name AS employeeName,
score,
apoc.convert.toJson(
CASE WHEN employee IS NOT NULL
THEN apoc.map.removeKeys(properties(employee),
['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
)
ELSE {}
END
) AS employeeJson,
""")
# Start connections array
return_blocks.append("apoc.convert.toJson([")
# Generate individual connection blocks
connection_blocks = []
for rel in employee_schema["relationships"]:
rel_type = rel["type"]
end_node = rel["endNode"]
cardinality = rel.get("cardinality", "0..n")
# Handle cardinality in return statement
is_single = cardinality in ["1..1", "0..1"]
collection_suffix = "[0]" if is_single else ""
if end_node in ["Availability", "Unavailability"]:
# Special handling for timeframe relationships
connection_blocks.append(f"""{{
type: '{rel_type}',
{rel_type.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL
THEN {{
employeeName: {rel_type.lower()}.employeeName,
timeframe: CASE WHEN {rel_type.lower()}TimeFrame IS NOT NULL
THEN {{
dateIndicator: {rel_type.lower()}TimeFrame.dateIndicator,
type: {rel_type.lower()}TimeFrame.type,
recurring: {rel_type.lower()}TimeFrame.recurring
}}
ELSE null
END
}}
ELSE null END){collection_suffix}
}}""")
else:
# Standard relationship handling
connection_blocks.append(f"""{{
type: '{rel_type}',
{end_node.lower()}: collect(DISTINCT CASE WHEN {rel_type.lower()} IS NOT NULL
THEN apoc.map.removeKeys(properties({rel_type.lower()}),
['embedding', 'id', 'elementId', 'user_id', 'timestamp', 'created', 'updated']
)
ELSE null END){collection_suffix}
}}""")
# Close connections array
return_blocks.append(",\n".join(connection_blocks))
return_blocks.append("]) AS connectionsJson")
# Combine all query parts
full_query = (
base_query +
"\n".join(match_clauses) +
"\nRETURN " +
"\n".join(return_blocks)
)
return full_query
Now, the output of this specific function can be seen in the example below:
Employee Vector Search Results:
[
{
"employeeName": "Emma Williams",
"score": 0.6321649551391602,
"employee": {
"name": "Emma Williams",
"email": "[email protected]"
},
"connections": [
{
"contract": {
"contractType": "Part-time"
},
"type": "HAS_CONTRACT_TYPE"
},
{
"has_unavailability": [],
"type": "HAS_UNAVAILABILITY"
},
{
"has_availability": [
{
"employeeName": "Emma Williams",
"timeframe": {
"recurring": true,
"dateIndicator": "Thu-Sat 16:00-00:00",
"type": "DayOfWeek"
}
}
],
"type": "HAS_AVAILABILITY"
},
{
"team": {
"name": "F&B"
},
"type": "BELONGS_TO"
},
{
"education": [],
"type": "HAS_EDUCATION"
},
{
"type": "HAS_CERTIFICATION",
"certification": [
{
"name": "Alcohol Service"
},
{
"name": "Mixology Certificate"
}
]
},
notice in the above output the empty strings in "education": [],
or "has_unavailability": [],
. I want those values to not be included in the output.
thank you in advance Manuel
I tried using CASE expressions or apoc.map.clean but did not seem to lead to the results i wanted.
Share Improve this question edited Nov 22, 2024 at 3:47 cybersam 67.1k6 gold badges57 silver badges80 bronze badges asked Nov 21, 2024 at 4:27 Manuel Maguga DarbinianManuel Maguga Darbinian 32 bronze badges2 Answers
Reset to default 0While it would be possible to do that by changing your Python and Cypher code, it would be much simpler to just modify the JSON Employee Vector Search Results
.
For example, here is a sample function that takes your Employee Vector Search Results
and filters out, from every employee's connections
list, all objects that have a type
key and another key whose value is an empty list:
def filter_employee_connections(json_result):
result = json.loads(json_result)
for employee in result:
employee["connections"] = [
c for c in employee["connections"]
if not any(isinstance(val, list) and not val for key, val in c.items() if key != "type")
]
return json.dumps(result, indent=2)
The resulting JSON would look something like this:
[
{
"employeeName": "Emma Williams",
"score": 0.6321649551391602,
"employee": {
"name": "Emma Williams",
"email": "[email protected]"
},
"connections": [
{
"contract": {
"contractType": "Part-time"
},
"type": "HAS_CONTRACT_TYPE"
},
{
"has_availability": [
{
"employeeName": "Emma Williams",
"timeframe": {
"recurring": true,
"dateIndicator": "Thu-Sat 16:00-00:00",
"type": "DayOfWeek"
}
}
],
"type": "HAS_AVAILABILITY"
},
{
"team": {
"name": "F&B"
},
"type": "BELONGS_TO"
},
{
"type": "HAS_CERTIFICATION",
"certification": [
{
"name": "Alcohol Service"
},
{
"name": "Mixology Certificate"
}
]
}
]
}
]
Thanks a lot for the suggestion! it was indeed much simpler to just modify the JSON Employee Vector Search and the other vector index Results
I implemented a solution that builds upon your approach while adding some additional robustness to handle different types of empty values in my Neo4j vector search results.
def filter_empty_connections(results):
"""
Filter out empty connections from vector search results across all data types
(Employee, Work, Project, WorkAllocation, WorkScheduleRule).
"""
if not results or "results" not in results:
return results
def is_empty_value(val):
"""Check if a value is considered empty."""
if isinstance(val, list):
# Check if list is empty or contains only empty/null values
return len(val) == 0 or all(is_empty_value(item) for item in val)
if isinstance(val, dict):
# Check if dict is empty or contains only empty/null values
return len(val) == 0 or all(is_empty_value(v) for v in val.values())
return val is None or val == ""
def filter_single_item(item):
"""Filter empty connections from a single item's connections array."""
if "connections" not in item:
return item
filtered_connections = []
for conn in item["connections"]:
# Skip the connection if all its non-type fields are empty
has_non_empty_value = False
for key, val in conn.items():
if key != "type" and not is_empty_value(val):
has_non_empty_value = True
break
if has_non_empty_value:
filtered_connections.append(conn)
item["connections"] = filtered_connections
return item
filtered_items = [
filter_single_item(item)
for item in results["results"]
]
return {"results": filtered_items}
Then I integrated it into my index functions like this:
def employee_details_index(query, query_embedding, n_results=50):
# ... existing query execution code ...
structured_results = []
for row in results:
employee_data = {
"employeeName": row["employeeName"],
"score": row["score"],
"employee": json.loads(row["employeeJson"]),
"connections": json.loads(row["connectionsJson"])
}
structured_results.append(employee_data)
# Apply filtering before returning
filtered_results = filter_empty_connections({"results": structured_results})
return filtered_results
This approach successfully removed empty connections like "education": [] and "has_unavailability": [] from the results while keeping the connection entries that had actual data.
Thank you again for pointing me in the right direction! This solution worked perfectly for my use case.