mirror of
https://github.com/deepmodeling/Uni-Lab-OS
synced 2026-03-31 13:33:07 +00:00
Merge branch 'dev' into feature/organic-extraction
This commit is contained in:
@@ -21,10 +21,21 @@ from pylabrobot.resources import (
|
||||
ResourceHolder,
|
||||
Lid,
|
||||
Trash,
|
||||
Tip,
|
||||
Tip, TubeRack,
|
||||
)
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from unilabos.devices.liquid_handling.rviz_backend import UniLiquidHandlerRvizBackend
|
||||
from unilabos.registry.placeholder_type import ResourceSlot
|
||||
from unilabos.resources.resource_tracker import (
|
||||
ResourceTreeSet,
|
||||
ResourceDict,
|
||||
EXTRA_SAMPLE_UUID,
|
||||
EXTRA_UNILABOS_SAMPLE_UUID,
|
||||
)
|
||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode, ROS2DeviceNode
|
||||
|
||||
|
||||
from unilabos.ros.nodes.base_device_node import BaseROS2DeviceNode
|
||||
class SimpleReturn(TypedDict):
|
||||
samples: List[List[ResourceDict]]
|
||||
volumes: List[float]
|
||||
@@ -237,12 +248,11 @@ class LiquidHandlerMiddleware(LiquidHandler):
|
||||
res_samples = []
|
||||
res_volumes = []
|
||||
for resource, volume, channel in zip(resources, vols, use_channels):
|
||||
res_samples.append(
|
||||
{"name": resource.name, "sample_uuid": resource.unilabos_extra.get("sample_uuid", None)}
|
||||
)
|
||||
sample_uuid_value = resource.unilabos_extra.get(EXTRA_SAMPLE_UUID, None)
|
||||
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: sample_uuid_value})
|
||||
res_volumes.append(volume)
|
||||
self.pending_liquids_dict[channel] = {
|
||||
"sample_uuid": resource.unilabos_extra.get("sample_uuid", None),
|
||||
EXTRA_SAMPLE_UUID: sample_uuid_value,
|
||||
"volume": volume,
|
||||
}
|
||||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||||
@@ -284,10 +294,10 @@ class LiquidHandlerMiddleware(LiquidHandler):
|
||||
res_samples = []
|
||||
res_volumes = []
|
||||
for resource, volume, channel in zip(resources, vols, use_channels):
|
||||
res_uuid = self.pending_liquids_dict[channel]["sample_uuid"]
|
||||
res_uuid = self.pending_liquids_dict[channel][EXTRA_SAMPLE_UUID]
|
||||
self.pending_liquids_dict[channel]["volume"] -= volume
|
||||
resource.unilabos_extra["sample_uuid"] = res_uuid
|
||||
res_samples.append({"name": resource.name, "sample_uuid": res_uuid})
|
||||
resource.unilabos_extra[EXTRA_SAMPLE_UUID] = res_uuid
|
||||
res_samples.append({"name": resource.name, EXTRA_SAMPLE_UUID: res_uuid})
|
||||
res_volumes.append(volume)
|
||||
|
||||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||||
@@ -687,7 +697,52 @@ class LiquidHandlerAbstract(LiquidHandlerMiddleware):
|
||||
well.set_liquids([(liquid_name, volume)]) # type: ignore
|
||||
res_volumes.append(volume)
|
||||
|
||||
return SimpleReturn(samples=res_samples, volumes=res_volumes)
|
||||
return SetLiquidReturn(
|
||||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), volumes=res_volumes # type: ignore
|
||||
)
|
||||
|
||||
def set_liquid_from_plate(
|
||||
self, plate: ResourceSlot, well_names: list[str], liquid_names: list[str], volumes: list[float]
|
||||
) -> SetLiquidFromPlateReturn:
|
||||
"""Set the liquid in wells of a plate by well names (e.g., A1, A2, B3).
|
||||
|
||||
如果 liquid_names 和 volumes 为空,但 plate 和 well_names 不为空,直接返回 plate 和 wells。
|
||||
"""
|
||||
assert issubclass(plate.__class__, Plate) or issubclass(plate.__class__, TubeRack) , f"plate must be a Plate, now: {type(plate)}"
|
||||
plate: Union[Plate, TubeRack]
|
||||
# 根据 well_names 获取对应的 Well 对象
|
||||
if issubclass(plate.__class__, Plate):
|
||||
wells = [plate.get_well(name) for name in well_names]
|
||||
elif issubclass(plate.__class__, TubeRack):
|
||||
wells = [plate.get_tube(name) for name in well_names]
|
||||
res_volumes = []
|
||||
|
||||
# 如果 liquid_names 和 volumes 都为空,直接返回
|
||||
if not liquid_names and not volumes:
|
||||
return SetLiquidFromPlateReturn(
|
||||
plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore
|
||||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore
|
||||
volumes=res_volumes,
|
||||
)
|
||||
|
||||
for well, liquid_name, volume in zip(wells, liquid_names, volumes):
|
||||
well.set_liquids([(liquid_name, volume)]) # type: ignore
|
||||
res_volumes.append(volume)
|
||||
|
||||
task = ROS2DeviceNode.run_async_func(self._ros_node.update_resource, True, **{"resources": wells})
|
||||
submit_time = time.time()
|
||||
while not task.done():
|
||||
if time.time() - submit_time > 10:
|
||||
self._ros_node.lab_logger().info(f"set_liquid_from_plate {plate} 超时")
|
||||
break
|
||||
time.sleep(0.01)
|
||||
|
||||
return SetLiquidFromPlateReturn(
|
||||
plate=ResourceTreeSet.from_plr_resources([plate], known_newly_created=False).dump(), # type: ignore
|
||||
wells=ResourceTreeSet.from_plr_resources(wells, known_newly_created=False).dump(), # type: ignore
|
||||
volumes=res_volumes,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# REMOVE LIQUID --------------------------------------------------
|
||||
# ---------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user