はじめに
ようやくUnityとUDP通信で接続するところまでできたので、その方法を共有します☺️
Python
Unityとの接続にあたって、前回からいくつか変更点があります。
・画像表示をUDP送信に変更
・ランドマークのコールバックでは、受信データをバッファへ格納するだけに変更
・UDP送信データフォーマット変更
・UDP送信用のJSONデータ格納変数(送信バッファ)をQueueに変更
・UDP送信をスレッド化
送信バッファをQueueにするのは、データ取りこぼしの懸念があるからです。
UDP送信と他処理は非同期になるので、JSONデータ更新から次のJSONデータ更新タイミングまでの間にUDP送信しない場合、1回分のデータを送信できないことになります。
Queueを使わない場合↓
この取りこぼしをなくすためにQueueを使います。
Queueを使う場合↓
ソースコード
UDP送信サンプルコード (Python)
import time
import cv2
import mediapipe as mp
import numpy as np
from socket import socket, AF_INET, SOCK_DGRAM
import json
import queue
import threading
PORT = 65500
ADDRESS = "127.0.0.1"
face_model_path = './face_landmarker.task'
hands_model_path = './hand_landmarker.task'
BaseOptions = mp.tasks.BaseOptions
FaceLandmarker = mp.tasks.vision.FaceLandmarker
FaceLandmarkerOptions = mp.tasks.vision.FaceLandmarkerOptions
FaceLandmarkerResult = mp.tasks.vision.FaceLandmarkerResult
HandLandmarker = mp.tasks.vision.HandLandmarker
HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions
HandLandmarkerResult = mp.tasks.vision.HandLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode
face_blendshapes_rcv = queue.Queue()
hand_world_landmarks_rcv = queue.Queue()
th_udp_status = False # スレッド停止
#顔ランドマーク情報からJSONデータ作成
def get_face_blendshapes_json(detection_result):
face_json = None
face_json_tmp = {}
face_data = []
# 顔を認識できなくなるとエラー終了してしまうので、捉えているか判定する
if detection_result.face_blendshapes:
face_blendshapes_list = detection_result.face_blendshapes
#検出する顔は1つであることが前提
for face_bs in face_blendshapes_list[0]:
face_data.append(face_bs.score) # 配列face_dataにscoreを追加する
face_json_tmp['parts'] = "face"
face_json_tmp['data'] = face_data
face_json = json.dumps(face_json_tmp) #jsonデータに変換する
return face_json
#手ランドマーク情報からJSONデータ作成
def get_hands_world_landmarks_json(detection_result):
right_hand_json = None
left_hand_json = None
right_hand_json_tmp = {}
left_hand_json_tmp = {}
# 手を認識できなくなるとエラー終了してしまうので、捉えているか判定する
if detection_result.hand_world_landmarks:
hand_landmarks_list = detection_result.hand_world_landmarks
handedness_list = detection_result.handedness
right_hand_json_tmp['parts'] = "hand_right"
left_hand_json_tmp['parts'] = "hand_left"
# Loop through the detected hands to visualize.
for idx in range(len(hand_landmarks_list)):
hand_landmarks = hand_landmarks_list[idx]
handedness = handedness_list[idx]
if handedness[0].category_name == "Right":
right_hand_data_x = []
right_hand_data_y = []
right_hand_data_z = []
# ランドマークのxyz座標を取得
for landmark in hand_landmarks:
right_hand_data_x.append(landmark.x)
right_hand_data_y.append(landmark.y)
right_hand_data_z.append(landmark.z)
# xyz座標をまとめてキーに設定する
right_hand_json_tmp['x'] = right_hand_data_x
right_hand_json_tmp['y'] = right_hand_data_y
right_hand_json_tmp['z'] = right_hand_data_z
right_hand_json = json.dumps(right_hand_json_tmp) #jsonデータに変換する
elif handedness[0].category_name == "Left":
left_hand_data_x = []
left_hand_data_y = []
left_hand_data_z = []
# ランドマークのxyz座標を取得
for landmark in hand_landmarks:
left_hand_data_x.append(landmark.x)
left_hand_data_y.append(landmark.y)
left_hand_data_z.append(landmark.z)
# xyz座標をまとめてキーに設定する
left_hand_json_tmp['x'] = left_hand_data_x
left_hand_json_tmp['y'] = left_hand_data_y
left_hand_json_tmp['z'] = left_hand_data_z
left_hand_json = json.dumps(left_hand_json_tmp) #jsonデータに変換する
else:
pass
return right_hand_json, left_hand_json
# Create a face landmarker instance with the live stream mode:
def print_result_face(result: FaceLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
global face_blendshapes_rcv
# 受信したランドマーク情報をキューにためる
face_blendshapes_rcv.put(result)
# Create a hand landmarker instance with the live stream mode:
def print_result_hands(result: HandLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
global hand_world_landmarks_rcv
# 受信したランドマーク情報をキューにためる
hand_world_landmarks_rcv.put(result)
# UDP送信するスレッド
def send_udp():
global th_udp_status, face_blendshapes_rcv, hand_world_landmarks_rcv
s = socket(AF_INET, SOCK_DGRAM)
while th_udp_status:
face_json = None
right_hand_json = None
left_hand_json = None
if (face_blendshapes_rcv.qsize() != 0):
# 顔はblendshapeデータをjson形式にするだけ
face_json = get_face_blendshapes_json(face_blendshapes_rcv.get())
if (hand_world_landmarks_rcv.qsize() != 0):
# 両手はworld landmarkデータをjson形式にするだけ
right_hand_json, left_hand_json = get_hands_world_landmarks_json(hand_world_landmarks_rcv.get())
if (face_json is not None):
s.sendto(face_json.encode('utf-8'), (ADDRESS, PORT))
if (right_hand_json is not None):
s.sendto(right_hand_json.encode('utf-8'), (ADDRESS, PORT))
if (left_hand_json is not None):
s.sendto(left_hand_json.encode('utf-8'), (ADDRESS, PORT))
s.close()
face_options = FaceLandmarkerOptions(
base_options=BaseOptions(model_asset_path=face_model_path),
running_mode=VisionRunningMode.LIVE_STREAM,
num_faces=1,
min_face_detection_confidence=0.5,
min_tracking_confidence=0.5,
output_face_blendshapes=True,
result_callback=print_result_face)
hands_options = HandLandmarkerOptions(
base_options=BaseOptions(model_asset_path=hands_model_path),
running_mode=VisionRunningMode.LIVE_STREAM,
num_hands=2,
min_hand_detection_confidence=0.2,
min_tracking_confidence=0.2,
result_callback=print_result_hands)
face_landmarker = FaceLandmarker.create_from_options(face_options)
hands_landmarker = HandLandmarker.create_from_options(hands_options)
print("face_hands_udp start")
cap = cv2.VideoCapture(0)
th_udp = threading.Thread(target=send_udp)
th_udp_status = True # UDP送信スレッド開始
th_udp.start()
# カメラが有効の場合のみ処理する
while cap.isOpened():
# カメラから画像1枚取得
success, image = cap.read()
if not success:
print("Ignoring empty camera frame.")
continue
mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image)
frame_timestamp_ms = int(time.time() * 1000)
face_landmarker.detect_async(mp_image, frame_timestamp_ms)
hands_landmarker.detect_async(mp_image, frame_timestamp_ms)
# 黒画表示
img_h = 100
img_w = 200
blank = np.zeros((img_h, img_w, 3))
cv2.imshow('MediaPipe Send UDP', blank)
#終了判定 ESCで終了する
if cv2.waitKey(5) & 0xFF == 27:
th_udp_status = False # UDP送信スレッド停止
break
cap.release()
th_udp.join()
print("face_hands_udp end")
UDP送信する手のJSONデータは、前回までは2次元配列にしていましたが、Unity側のJSONと相性が悪く、xyzに分割しました。
UDP送信自体はスレッド化しました。
ランドマークをJSONに変換する処理がそこそこ重く、その間メインスレッドは止まってしまいます。
全体のリアルタイム性を確保するためにも、ランドマークの処理に直接影響しないUDP送信部分は別スレッドに逃しました。
Unity
こちらは環境構築から。
Unity Hubからインストールします。公式HPからダウンロードできます。
https://unity.com/ja/download
プロジェクト作成
Unity Hubをインストール後、起動してプロジェクト作成します。
今回はUniversal 3Dを選択します。
Universal 3Dを選択
プロジェクト名はお好みで
作成完了するとこんな画面になります
画面左のヒエラルキーでを右クリックし、3Dオブジェクトのキューブを追加します。
画面左下のプロジェクトのAssetsで右クリックし、作成 > Scripting > Empty C# ScriptでC#ファイルを作成します。
ファイル名は何でもOKです。
作成したC#スクリプトをヒエラルキーのCubeへドラッグ&ドロップすると、コンポーネントとして登録できます。
最終的には画像のようになります。
スクリプト(ソースコード)
C#スクリプトをダブルクリックすると開きますが、デフォルトではVisual Studioが開きます。
ただちょっと使いづらさを感じたので、VSCodeで開くようにします。
ウインドウ左上の
Unity > Settings… > 外部ツール > 外部のスクリプトエディター
で、プルダウンメニューからVSCodeを選択します。
スクリプトをダブルクリックしてVSCodeで開くと、こんな通知が来る場合があります。
インストールしました。
UDP受信サンプルコード (C#)
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Text;
using UnityEngine.UI;
[System.Serializable]
public class PartsData_face
{
public string parts = "";
public float[] data;
}
[System.Serializable]
public class PartsData_hand
{
public string parts = "";
public float[] x;
public float[] y;
public float[] z;
}
public class LocalUdpReceive : MonoBehaviour
{
static int localPort = 65500;
static UdpClient udpUnity;
public Queue<PartsData_face> rcvData_face;
public Queue<PartsData_hand> rcvData_righthand;
public Queue<PartsData_hand> rcvData_lefthand;
void Start()
{
udpUnity = new UdpClient(localPort);
udpUnity.BeginReceive(OnReceived, udpUnity);
Debug.Log("start");
rcvData_face = new Queue<PartsData_face>();
rcvData_righthand = new Queue<PartsData_hand>();
rcvData_lefthand = new Queue<PartsData_hand>();
}
static public byte[] unityData;
void Update()
{
if((rcvData_face != null) && (rcvData_face.Count != 0))
{
PartsData_face face = rcvData_face.Dequeue();
}
if((rcvData_righthand != null) && (rcvData_righthand.Count != 0))
{
PartsData_hand right_hand = rcvData_righthand.Dequeue();
}
if((rcvData_lefthand != null) && (rcvData_lefthand.Count != 0))
{
PartsData_hand left_hand = rcvData_lefthand.Dequeue();
//試しにboxを動かしてみる 左手の人差し指に連動する
Transform myTransform = this.transform;
Vector3 pos = myTransform.position;
pos.x = left_hand.x[8]*(-100);
pos.y = left_hand.y[8]*(-50);
pos.z = left_hand.z[8]*(-100);
myTransform.position = pos;
}
}
private void OnApplicationQuit()
{
if (udpUnity != null) udpUnity.Close();
}
private void OnDestroy()
{
if (udpUnity != null) udpUnity.Close();
}
private void OnReceived(System.IAsyncResult result) {
UdpClient getUdp = (UdpClient) result.AsyncState;
IPEndPoint ipEnd = null;
byte[] getByte = getUdp.EndReceive(result, ref ipEnd);
string text = Encoding.UTF8.GetString(getByte);
if (text.Contains("face"))
{
PartsData_face json_face = JsonUtility.FromJson<PartsData_face>(text);
rcvData_face.Enqueue(json_face);
}
if (text.Contains("hand_right"))
{
PartsData_hand json_hand = JsonUtility.FromJson<PartsData_hand>(text);
rcvData_righthand.Enqueue(json_hand);
}
if (text.Contains("hand_left"))
{
PartsData_hand json_hand = JsonUtility.FromJson<PartsData_hand>(text);
rcvData_lefthand.Enqueue(json_hand);
}
getUdp.BeginReceive(OnReceived, getUdp);
}
}
Unityでは、オブジェクトが生成されたときにStart関数が1度だけコールされます。主に初期化に使用します。
それ以降はUpdate関数が、画面更新周期ごとにコールされます。
終了したときは、OnApplicationQuitが1度だけコールされます。
ソースを見ていきます。
[System.Serializable]
public class PartsData_face
{
public string parts = "";
public float[] data;
}
[System.Serializable]
public class PartsData_hand
{
public string parts = "";
public float[] x;
public float[] y;
public float[] z;
}
JSONデータ受信用のクラスを作成します。
受信データをJSONフォーマットとして変換するにはいくつかの方法がありますが、今回はJsonUtilityを使用します。この場合は[System.Serializable]を付与する必要があります。
クラス変数は、JSONデータ内のキーの名前と一致させます。
void Update()
{
if((rcvData_face != null) && (rcvData_face.Count != 0))
{
PartsData_face face = rcvData_face.Dequeue();
}
if((rcvData_righthand != null) && (rcvData_righthand.Count != 0))
{
PartsData_hand right_hand = rcvData_righthand.Dequeue();
}
if((rcvData_lefthand != null) && (rcvData_lefthand.Count != 0))
{
PartsData_hand left_hand = rcvData_lefthand.Dequeue();
//試しにboxを動かしてみる 左手の人差し指に連動する
Transform myTransform = this.transform;
Vector3 pos = myTransform.position;
pos.x = left_hand.x[8]*(-100);
pos.y = left_hand.y[8]*(-50);
pos.z = left_hand.z[8]*(-100);
myTransform.position = pos;
}
}
Countでキューにたまっている数を取得できます。
0でなければ、Dequeueで先頭から1つ取り出して処理を行います。
今回はお試しとして、左手の人差し指でCubeを動かすようにしています。
受信データのままだとスケールが小さいので、適当な係数を掛けています。
private void OnReceived(System.IAsyncResult result) {
UdpClient getUdp = (UdpClient) result.AsyncState;
IPEndPoint ipEnd = null;
byte[] getByte = getUdp.EndReceive(result, ref ipEnd);
string text = Encoding.UTF8.GetString(getByte);
if (text.Contains("face"))
{
PartsData_face json_face = JsonUtility.FromJson<PartsData_face>(text);
rcvData_face.Enqueue(json_face);
}
if (text.Contains("hand_right"))
{
PartsData_hand json_hand = JsonUtility.FromJson<PartsData_hand>(text);
rcvData_righthand.Enqueue(json_hand);
}
if (text.Contains("hand_left"))
{
PartsData_hand json_hand = JsonUtility.FromJson<PartsData_hand>(text);
rcvData_lefthand.Enqueue(json_hand);
}
getUdp.BeginReceive(OnReceived, getUdp);
}
受信データをUTF-8でエンコードして、textに格納します。
受信データ内を文字列検索し、顔・右手・左手ごとのキューへ格納します。
動作確認
Unityのエディタ上側にあるPlay(再生ボタン)を押すと、受信待機します。
VSCodeでPython(Mediapipe)側のファイルを開き、実行ボタンを押すと、送信が始まります。
カメラに向かって左手を動かすと、人差し指に追従してCubeが動きます。
動きがカクカクしているのはGIFだからというのもありますが、、Update()で掛ける係数が粗い(1フレームで大きく動きすぎ)ので、ここは調整の余地アリです。
不明点
Unityで動作確認したときコンソールを見ると、こんなエラーが出ていました。
SocketException: Address already in use
「ポートはすでに使われているから別のを使いなよ」的な例外のようです。
ただ最初から出ていたわけではなく、どこかのコード変更によるものでもなさそうです。
送受信するポートの変更や、再起動を試しましたがダメでした。何だろう🤔
この状態でも動作はするので、一旦置いておきます。
さいごに
ようやく、Unityでオブジェクトを動かすところまでできました😁
JSONのシリアライズのしかたなど、慣れない部分に手間取りましたが、良い勉強になりました。
今後、他のオブジェクトも動かせるようにしていく予定です。
それでは、今回はここまで。
ありがとうございました😊
コメント