import serial
ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=1)
# [First Case]
if ser.is_open:
for x in range(10):
data = data + x
ser.write(data)
ser.close()
# [Second Case]
if ser.is_open:
data = ser.read(100)
print(data)
ser.close()
import serial
import time
import rospy
from geometry_msgs.msg import Pose
from geometry_msgs.msg import Twist
import sys
import signal
# ctrl + c -> exit program
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
def cmd_callback(msg):
print("cmd_callback!")
cmd = Twist()
cmd.linear.x = msg.linear.x
cmd.linear.y = msg.linear.y
cmd.linear.z = msg.linear.z
cmd.angular.x = msg.angular.x
cmd.angular.y = msg.angular.y
cmd.angular.z = msg.angular.z
ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=1)
if ser.is_open:
print("ser is open")
data = 's,%.2f,%.2f,%.2f,%.4f,%.4f,%.4f,'%(cmd.linear.x, cmd.linear.y, cmd.linear.z, cmd.angular.x, cmd.angular.y, cmd.angular.z)
while len(data) < 80:
data = data + 'x'
ser.write(data)
**time.sleep(0.5) ***important*****
else:
ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=1)
if __name__ == '__main__':
ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=1)
rospy.init_node("NUC_Node", anonymous=True)
received_odom_pub = rospy.Publisher("/received_pose", Pose, queue_size=1)
send_to_cmd_vel = rospy.Subscriber("/cmd_vel", Twist, cmd_callback)
while 1:
try:
if ser.is_open:
d=ser.read(80)
b=d.split(',')
if b[0]!='s':
ser.close()
else:
pmsg = Pose()
pmsg.position.x = float(b[1])
pmsg.position.y = float(b[2])
pmsg.position.z = float(b[3])
pmsg.orientation.x = float(b[4])
pmsg.orientation.y = float(b[5])
pmsg.orientation.z = float(b[6])
pmsg.orientation.w = float(b[7])
received_odom_pub.publish(pmsg)
else:
ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=1)
except (SystemExit, KeyboardInterrupt) :
ser.close() # close port
sys.exit(0)
"""
Created on Thu Jan 21 04:52:04 2021
@author: EungChang Mason Lee
@based on EungChang Mason Lee codes
"""
import serial
import time
import rospy
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Twist
import sys
import signal
def signal_handler(signal, frame):
print('Pressed Ctrl+C')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
''' class '''
class serial_to_NUC():
def __init__(self):
rospy.init_node('serial_connector', anonymous=True)
self.cmd_pub = rospy.Publisher('/cmd_vel', Twist, queue_size=1)
self.odom_sub = rospy.Subscriber('/odom', Odometry, self.odom_callback)
self.rate = rospy.Rate(30)
self.ser = serial.Serial('/dev/ttyUSB0', 19200, timeout=1)
def odom_callback(self, msg):
if self.ser.is_open:
data = 's,%.2f,%.2f,%.2f,%.6f,%.6f,%.6f,%.6f,'%(msg.pose.pose.position.x, msg.pose.pose.position.y, msg.pose.pose.position.z, msg.pose.pose.orientation.x, msg.pose.pose.orientation.y, msg.pose.pose.orientation.z, msg.pose.pose.orientation.w)
while len(data)<80:
data= data + 'x'
self.ser.write(data)
**time.sleep(0.5) ***important*****
def twist_pub(self,msg):
self.cmd_pub.publish(msg)
if __name__== '__main__':
ser_nuc = serial_to_NUC()
twist_msg= Twist()
twist_msg.linear.x = 0
twist_msg.linear.y = 0
twist_msg.linear.z = 0
twist_msg.angular.x = 0
twist_msg.angular.y = 0
twist_msg.angular.z = 0
while 1:
try:
if ser_nuc.ser.is_open:
d=ser_nuc.ser.read(80)
if not(len(d)<80):
if(d):
try:
x =[i for i in d.split(',')]
print(x)
twist_msg.linear.x = float(x[1])
twist_msg.linear.y = float(x[2])
twist_msg.linear.z = float(x[3])
twist_msg.angular.x = float(x[4])
twist_msg.angular.y = float(x[5])
twist_msg.angular.z = float(x[6])
except:
print("corrupted message")
else:
print("empty message")
ser_nuc.twist_pub(twist_msg)
ser_nuc.rate.sleep()
except (SystemExit, KeyboardInterrupt):
ser_nuc.ser.close()
sys.exit(0)