以下の内容はhttps://seinzumtode.hatenadiary.jp/entry/2025/09/23/225214より取得しました。


ROS2でステートマシンをnav2と一緒に使う

py_trees, py_trees_rosが使えそう。
以下のコードを試す(AI生成コード)

#!/usr/bin/env python3
import rclpy
import py_trees
import py_trees_ros
from py_trees.common import ParallelPolicy
from geometry_msgs.msg import PoseStamped, Twist
from nav_msgs.msg import Odometry
from nav2_msgs.action import NavigateToPose
import sys

def print_same_line(msg1, msg2):
    sys.stdout.write("\033[F\033[F")  # カーソルを2行上に移動
    sys.stdout.write(f"{msg1}\n")
    sys.stdout.write(f"{msg2}\n")
    sys.stdout.flush()

# ---- Task 1 & Task 2 ----
class PrintHello(py_trees.behaviour.Behaviour):
    def __init__(self, name="PrintHello"):
        super().__init__(name)
        self.executed = False

    def update(self):
        if not self.executed:
            print("hello")
            self.executed = True
        return py_trees.common.Status.SUCCESS


class PrintHi(py_trees.behaviour.Behaviour):
    def __init__(self, name="PrintHi"):
        super().__init__(name)
        self.executed = False

    def update(self):
        if not self.executed:
            print("hi")
            self.executed = True
        return py_trees.common.Status.SUCCESS


class MoveToPosition(py_trees.behaviour.Behaviour):
    # Class variables to share origin and odom across all instances
    origin_x = None
    origin_y = None
    origin_yaw = None  # Store initial orientation
    origin_samples = []  # Store multiple samples to ensure robot is stationary
    origin_sample_count = 10  # Increased samples for better stability
    origin_locked = False  # Prevent origin from being reset once established

    # Shared odom data
    global_x = 0.0
    global_y = 0.0
    global_yaw = 0.0
    odom_initialized = False
    odom_sub = None  # Single subscription shared across instances

    def __init__(self, name, target_x, target_y, tolerance=1.0):
        super().__init__(name)
        self.target_x = target_x
        self.target_y = target_y
        self.cmd_vel_pub = None
        self.completed = False
        self.tolerance = tolerance  # Configurable tolerance

    def setup(self, **kwargs):
        node = kwargs.get('node')
        if node:
            self.cmd_vel_pub = node.create_publisher(Twist, '/cmd_vel', 10)
            # Create single shared odom subscription
            if MoveToPosition.odom_sub is None:
                MoveToPosition.odom_sub = node.create_subscription(
                    Odometry, '/diff_drive_base_controller/odom', MoveToPosition.shared_odom_callback, 10)
                print(f"Shared odom subscription created")
            print(f"{self.name}: Setup completed")

    @classmethod
    def shared_odom_callback(cls, msg):
        import math
        raw_x = msg.pose.pose.position.x
        raw_y = msg.pose.pose.position.y

        # Extract yaw from quaternion
        orientation = msg.pose.pose.orientation
        siny_cosp = 2 * (orientation.w * orientation.z + orientation.x * orientation.y)
        cosy_cosp = 1 - 2 * (orientation.y * orientation.y + orientation.z * orientation.z)
        raw_yaw = math.atan2(siny_cosp, cosy_cosp)

        # Set initial position and orientation as origin only when robot is stationary
        # Once origin is locked, never change it again
        if not cls.origin_locked and cls.origin_x is None:
            # Collect samples to ensure robot is stationary
            cls.origin_samples.append((raw_x, raw_y, raw_yaw))

            if len(cls.origin_samples) >= cls.origin_sample_count:
                # Check if all samples are close to each other (robot is stationary)
                samples = cls.origin_samples
                x_values = [s[0] for s in samples]
                y_values = [s[1] for s in samples]
                yaw_values = [s[2] for s in samples]

                x_range = max(x_values) - min(x_values)
                y_range = max(y_values) - min(y_values)
                yaw_range = max(yaw_values) - min(yaw_values)

                if x_range < 0.005 and y_range < 0.005 and yaw_range < 0.02:  # Very strict tolerance
                    cls.origin_x = sum(x_values) / len(x_values)
                    cls.origin_y = sum(y_values) / len(y_values)
                    cls.origin_yaw = sum(yaw_values) / len(yaw_values)
                    cls.origin_locked = True  # Lock the origin permanently
                    print(f"ORIGIN LOCKED at stationary pose: ({cls.origin_x:.3f}, {cls.origin_y:.3f}, {cls.origin_yaw:.3f})")
                else:
                    # Robot is moving, clear samples and wait for it to stop
                    print("Robot is moving, waiting for it to stop before setting origin...")
                    cls.origin_samples = []

        # Only process coordinates if origin is locked (permanently established)
        if cls.origin_locked and cls.origin_x is not None:
            # Calculate relative position with rotation compensation
            # Transform to origin-centered coordinates
            dx = raw_x - cls.origin_x
            dy = raw_y - cls.origin_y

            # Rotate coordinates to align with initial orientation
            cos_yaw = math.cos(-cls.origin_yaw)
            sin_yaw = math.sin(-cls.origin_yaw)

            cls.global_x = dx * cos_yaw - dy * sin_yaw
            cls.global_y = dx * sin_yaw + dy * cos_yaw
            cls.global_yaw = raw_yaw - cls.origin_yaw

            cls.odom_initialized = True  # Mark that we received odom data
            # Only print from first instance to reduce spam
        else:
            pass  # Reduced logging

    def update(self):
        import math

        # If already completed, don't run again
        if self.completed:
            return py_trees.common.Status.SUCCESS

        # Wait for origin to be locked and odom data to be ready
        if not MoveToPosition.origin_locked or not MoveToPosition.odom_initialized:
            print(f"{self.name}: Waiting for origin to be LOCKED and odom data...")
            return py_trees.common.Status.RUNNING

        # Get current position from shared data
        current_x = MoveToPosition.global_x
        current_y = MoveToPosition.global_y

        # Calculate distance to target
        dx = self.target_x - current_x
        dy = self.target_y - current_y
        distance = math.sqrt(dx*dx + dy*dy)

        # print(f"{self.name}: Current({current_x:.2f}, {current_y:.2f}) -> Target({self.target_x}, {self.target_y}), Distance: {distance:.2f}m")
        distance_msg = f"{self.name}: Current({current_x:.2f}, {current_y:.2f}) -> Target({self.target_x}, {self.target_y}), Distance: {distance:.2f}m"

        # Check if reached target
        if distance < self.tolerance:
            # Stop and complete
            twist = Twist()  # All velocities = 0
            if self.cmd_vel_pub:
                self.cmd_vel_pub.publish(twist)
                # Publish stop command multiple times to ensure it's received
                import time
                time.sleep(0.1)
                self.cmd_vel_pub.publish(twist)
            print(f"{self.name}: Reached target position! Final distance: {distance:.2f}m")
            self.completed = True
            return py_trees.common.Status.SUCCESS

        # Get current robot orientation
        current_yaw = MoveToPosition.global_yaw

        # Calculate desired angle to target
        angle_to_target = math.atan2(dy, dx)

        # Calculate angular error (difference between desired and current orientation)
        angular_error = angle_to_target - current_yaw

        # Normalize angular error to [-pi, pi] (shortest rotation)
        while angular_error > math.pi:
            angular_error -= 2 * math.pi
        while angular_error < -math.pi:
            angular_error += 2 * math.pi

        # Improved proportional controller
        twist = Twist()

        # Only move forward if roughly facing the right direction
        if abs(angular_error) < math.pi/4:  # Wiithin 45 degrees
            # Linear velocity - reduce when close to target
            if distance > 1.0:
                twist.linear.x = 0.3  # Full speed when far
            else:
                twist.linear.x = max(0.1, distance * 0.3)  # Slow down when close
        else:
            # Turn in place when not facing target
            twist.linear.x = 0.0

        # Angular velocity - proportional to angular error with limits
        twist.angular.z = max(-0.8, min(0.8, angular_error * 2.0))  # Stronger angular control

        # print(f"\r{self.name}: angle_to_target={angle_to_target:.2f}, current_yaw={current_yaw:.2f}, angular_error={angular_error:.2f}", end="")
        angular_msg = f"\r{self.name}: angle_to_target={angle_to_target:.2f}, current_yaw={current_yaw:.2f}, angular_error={angular_error:.2f}"
        print(distance_msg+"\n"+angular_msg+"\n"+"\033[2A",end="")

        if self.cmd_vel_pub:
            self.cmd_vel_pub.publish(twist)

        return py_trees.common.Status.RUNNING


# ---- Helper for goals ----
def make_pose(x, y, yaw=0.0, frame="map"):
    import math
    pose = PoseStamped()
    pose.header.frame_id = frame
    # Leave timestamp as zero to use current time
    pose.header.stamp.sec = 0
    pose.header.stamp.nanosec = 0
    pose.pose.position.x = float(x)
    pose.pose.position.y = float(y)
    pose.pose.position.z = 0.0

    # Convert yaw to quaternion
    pose.pose.orientation.x = 0.0
    pose.pose.orientation.y = 0.0
    pose.pose.orientation.z = math.sin(yaw / 2.0)
    pose.pose.orientation.w = math.cos(yaw / 2.0)

    print(f"Goal created: x={x}, y={y}, yaw={yaw}, frame={frame}")
    return pose


# ---- Tree Definition ----
def create_root():
    root = py_trees.composites.Sequence("RootSequence", memory=True)

    # Move to specific coordinates
    move1 = MoveToPosition("MoveToPoint1", 1.0, 1.0, tolerance=0.45)  # Move to (1.0, 1.0) with 1.5m tolerance
    task1 = PrintHello("Task1")
    move2 = MoveToPosition("MoveToPoint2", -1.0, -1.0, tolerance=0.45)  # Move to (-1.0, -1.0) with 1.5m tolerance
    task2 = PrintHi("Task2")

    # Add to sequence
    root.add_children([move1, task1, move2, task2])
    return root


# ---- Main ----
def main():
    rclpy.init()

    root = create_root()

    # ROS2 node wrapper
    node = py_trees_ros.trees.BehaviourTree(root)

    try:
        node.setup(timeout=10.0, node=node.node)  # Pass the ROS node to behaviors

        # Execute the tree with regular timer
        print("Behavior tree starting...")

        tree_completed = False  # Track completion state

        def tick_tree():
            nonlocal tree_completed

            if tree_completed:
                return  # Don't execute if already completed

            result = node.tick_tock(period_ms=100)
            #print(f"Tree status: {node.root.status}")

            # Print status of each child
            for i, child in enumerate(node.root.children):
                print(f"  Child {i} ({child.name}): {child.status}")

            if node.root.status == py_trees.common.Status.SUCCESS:
                print("Behavior tree completed successfully!")
                tree_completed = True  # Mark as completed
                return
            elif node.root.status == py_trees.common.Status.FAILURE:
                print("Behavior tree failed!")
                tree_completed = True  # Mark as completed
                return

        # Create a regular timer to execute the tree
        timer = node.node.create_timer(0.5, tick_tree)  # Execute every 0.5 seconds

        print("Behavior tree is running...")
        rclpy.spin(node.node)
    except RuntimeError as e:
        print(f"Setup failed: {e}")
        print("Make sure Nav2 is running: ros2 launch nav2_bringup navigation_launch.py")
    except KeyboardInterrupt:
        pass
    finally:
        node.shutdown()
        rclpy.shutdown()


if __name__ == "__main__":
    main()



以上の内容はhttps://seinzumtode.hatenadiary.jp/entry/2025/09/23/225214より取得しました。
このページはhttp://font.textar.tv/のウェブフォントを使用してます

不具合報告/要望等はこちらへお願いします。
モバイルやる夫Viewer Ver0.14