restructure and improve hass_ws api

This commit is contained in:
Nitwel 2024-05-20 12:53:22 +02:00
parent 1365a48e83
commit a892af092a
12 changed files with 314 additions and 246 deletions

View File

@ -1,36 +1,31 @@
extends RefCounted
class_name Promise
enum Status {
RESOLVED,
REJECTED
}
signal settled(status: PromiseResult)
signal resolved(value: Variant)
signal rejected(reason: Rejection)
## Generic rejection reason
const PROMISE_REJECTED := "Promise rejected"
var is_settled := false
func _init(callable: Callable):
resolved.connect(
func(value: Variant):
is_settled = true
settled.emit(PromiseResult.new(Status.RESOLVED, value)),
func(value: Variant):
is_settled=true
settled.emit(PromiseResult.new(Status.RESOLVED, value)),
CONNECT_ONE_SHOT
)
rejected.connect(
func(rejection: Rejection):
is_settled = true
settled.emit(PromiseResult.new(Status.REJECTED, rejection)),
is_settled=true
settled.emit(PromiseResult.new(Status.REJECTED, rejection)),
CONNECT_ONE_SHOT
)
@ -42,28 +37,25 @@ func _init(callable: Callable):
if not is_settled:
rejected.emit(rejection)
)
func then(resolved_callback: Callable) -> Promise:
resolved.connect(
resolved_callback,
resolved_callback,
CONNECT_ONE_SHOT
)
return self
func catch(rejected_callback: Callable) -> Promise:
rejected.connect(
rejected_callback,
rejected_callback,
CONNECT_ONE_SHOT
)
return self
static func from(input_signal: Signal) -> Promise:
return Promise.new(
func(resolve: Callable, _reject: Callable):
var number_of_args := input_signal.get_object().get_signal_list() \
var number_of_args:=input_signal.get_object().get_signal_list() \
.filter(func(signal_info: Dictionary) -> bool: return signal_info["name"] == input_signal.get_name()) \
.map(func(signal_info: Dictionary) -> int: return signal_info["args"].size()) \
.front() as int
@ -73,23 +65,21 @@ static func from(input_signal: Signal) -> Promise:
resolve.call(null)
else:
# only one arg in signal is allowed for now
var result = await input_signal
var result=await input_signal
resolve.call(result)
)
static func from_many(input_signals: Array[Signal]) -> Array[Promise]:
return input_signals.map(
func(input_signal: Signal):
func(input_signal: Signal):
return Promise.from(input_signal)
)
static func all(promises: Array[Promise]) -> Promise:
return Promise.new(
func(resolve: Callable, reject: Callable):
var resolved_promises: Array[bool] = []
var results := []
var resolved_promises: Array[bool]=[]
var results:=[]
results.resize(promises.size())
resolved_promises.resize(promises.size())
resolved_promises.fill(false)
@ -97,8 +87,8 @@ static func all(promises: Array[Promise]) -> Promise:
for i in promises.size():
promises[i].then(
func(value: Variant):
results[i] = value
resolved_promises[i] = true
results[i]=value
resolved_promises[i]=true
if resolved_promises.all(func(value: bool): return value):
resolve.call(results)
).catch(
@ -107,30 +97,28 @@ static func all(promises: Array[Promise]) -> Promise:
)
)
static func any(promises: Array[Promise]) -> Promise:
return Promise.new(
func(resolve: Callable, reject: Callable):
var rejected_promises: Array[bool] = []
var rejections: Array[Rejection] = []
var rejected_promises: Array[bool]=[]
var rejections: Array[Rejection]=[]
rejections.resize(promises.size())
rejected_promises.resize(promises.size())
rejected_promises.fill(false)
for i in promises.size():
promises[i].then(
func(value: Variant):
func(value: Variant):
resolve.call(value)
).catch(
func(rejection: Rejection):
rejections[i] = rejection
rejected_promises[i] = true
rejections[i]=rejection
rejected_promises[i]=true
if rejected_promises.all(func(value: bool): return value):
reject.call(PromiseAnyRejection.new(PROMISE_REJECTED, rejections))
)
)
class PromiseResult:
var status: Status
var payload: Variant
@ -139,7 +127,6 @@ class PromiseResult:
status = _status
payload = _payload
class Rejection:
var reason: String
var stack: Array
@ -148,14 +135,12 @@ class Rejection:
reason = _reason
stack = get_stack() if OS.is_debug_build() else []
func as_string() -> String:
return ("%s\n" % reason) + "\n".join(
stack.map(
func(dict: Dictionary) -> String:
func(dict: Dictionary) -> String:
return "At %s:%i:%s" % [dict["source"], dict["line"], dict["function"]]
))
class PromiseAnyRejection extends Rejection:
var group: Array[Rejection]

View File

@ -43,9 +43,6 @@ transform = Transform3D(0.967043, 0.24582, -0.0663439, -0.0663439, 0.494837, 0.8
[node name="MiddleTip" parent="XROrigin3D/XRControllerLeft" index="6"]
transform = Transform3D(0.98042, 0.196912, 0.00149799, 0.001498, -0.015065, 0.999885, 0.196912, -0.980305, -0.0150651, -0.00327212, -0.00771427, -0.176318)
[node name="Palm" parent="XROrigin3D/XRControllerLeft" index="7"]
transform = Transform3D(1, 3.12364e-06, -3.13861e-06, -3.12371e-06, 1, -1.97886e-05, 3.13854e-06, 1.97889e-05, 1, 0.0307807, -0.0419721, -0.0399505)
[node name="XRControllerRight" parent="XROrigin3D" instance=ExtResource("7_0b3tc")]
transform = Transform3D(0.999999, -1.39635e-11, 0, 1.31553e-10, 1, 0, 0, 0, 1, 0.336726, 0.575093, -0.437942)

View File

@ -8,7 +8,7 @@
[ext_resource type="Texture2D" uid="uid://bl33klueufwja" path="res://assets/cursors/pointer.png" id="6_ypel5"]
[ext_resource type="Texture2D" uid="uid://churthrr24yhw" path="res://assets/cursors/old.png" id="7_un12x"]
[sub_resource type="ShaderMaterial" id="ShaderMaterial_51ev7"]
[sub_resource type="ShaderMaterial" id="ShaderMaterial_5amor"]
resource_local_to_scene = true
render_priority = 10
shader = ExtResource("4_v4u0l")
@ -23,7 +23,7 @@ shader_parameter/corner_radius = 0.2
shader_parameter/roughness = 0.3
shader_parameter/grain_amount = 0.02
[sub_resource type="QuadMesh" id="QuadMesh_gh4gw"]
[sub_resource type="QuadMesh" id="QuadMesh_i5pey"]
size = Vector2(0.04, 0.04)
[sub_resource type="BoxShape3D" id="BoxShape3D_01skh"]
@ -34,7 +34,7 @@ size = Vector3(0.04, 0.04, 0.01)
resource_local_to_scene = true
size = Vector3(0.04, 0.04, 0.03)
[sub_resource type="ShaderMaterial" id="ShaderMaterial_mn1g0"]
[sub_resource type="ShaderMaterial" id="ShaderMaterial_dsrxt"]
resource_local_to_scene = true
render_priority = 10
shader = ExtResource("4_v4u0l")
@ -49,7 +49,7 @@ shader_parameter/corner_radius = 0.2
shader_parameter/roughness = 0.3
shader_parameter/grain_amount = 0.02
[sub_resource type="QuadMesh" id="QuadMesh_jbtbu"]
[sub_resource type="QuadMesh" id="QuadMesh_7pl3m"]
size = Vector2(0.04, 0.04)
[node name="FeaturesMenu" type="Node3D"]
@ -93,8 +93,8 @@ icon = true
toggleable = true
[node name="Panel3D" parent="CursorOptions/CircleCursor/Body" index="0"]
material_override = SubResource("ShaderMaterial_51ev7")
mesh = SubResource("QuadMesh_gh4gw")
material_override = SubResource("ShaderMaterial_5amor")
mesh = SubResource("QuadMesh_i5pey")
[node name="CollisionShape3D" parent="CursorOptions/CircleCursor/Body" index="1"]
shape = SubResource("BoxShape3D_01skh")
@ -126,8 +126,8 @@ icon = true
toggleable = true
[node name="Panel3D" parent="CursorOptions/RetroCursor/Body" index="0"]
material_override = SubResource("ShaderMaterial_mn1g0")
mesh = SubResource("QuadMesh_jbtbu")
material_override = SubResource("ShaderMaterial_dsrxt")
mesh = SubResource("QuadMesh_7pl3m")
[node name="CollisionShape3D" parent="CursorOptions/RetroCursor/Body" index="1"]
shape = SubResource("BoxShape3D_01skh")

View File

@ -9,14 +9,6 @@ const apis = {
"hass_ws": HassWebSocket
}
const methods = [
"get_devices",
"get_device",
"get_state",
"set_state",
"watch_state"
]
var groups = EntityGroups.new()
## Emitted when the connection to the home automation system is established
@ -73,9 +65,6 @@ func start_adapter(type: String, url: String, token: String):
on_disconnect.emit()
)
for method in methods:
assert(api.has_method(method), "%s Api does not implement method: %s" % [type, method])
func _on_connect():
on_connect.emit()

View File

@ -0,0 +1,53 @@
extends Node
const Connection = preload ("connection.gd")
const TimedSignal = preload ("res://lib/utils/timed_signal.gd")
signal on_authenticated()
signal _try_auth(success: bool)
enum AuthError {
OK = 0,
INVALID_TOKEN = 1,
TIMEOUT = 2,
UNKNOWN = 3
}
var connection: Connection
var token: String
var authenticated := false
func _init(connection: Connection):
self.connection = connection
func authenticate(token: String=self.token) -> AuthError:
self.token = token
connection.on_packed_received.connect(_handle_message)
var error = await TimedSignal.timed_signal(self, _try_auth, 10.0)
if error == Error.ERR_TIMEOUT:
return AuthError.TIMEOUT
elif error == Error.ERR_CANT_RESOLVE:
return AuthError.INVALID_TOKEN
elif error == Error.OK:
return AuthError.OK
return AuthError.UNKNOWN
func _handle_message(message):
match message["type"]:
"auth_required":
connection.send_packet({"type": "auth", "access_token": self.token})
"auth_ok":
authenticated = true
_try_auth.emit(Error.OK)
on_authenticated.emit()
"auth_invalid":
_try_auth.emit(Error.ERR_CANT_RESOLVE)
EventSystem.notify("Failed to authenticate with Home Assistant. Check your token and try again.", EventNotify.Type.DANGER)
connection.handle_disconnect()
func on_disconnect():
authenticated = false

View File

@ -0,0 +1,187 @@
extends Node
const HASS_API = preload ("hass.gd")
const Auth = preload ("./auth.gd")
const TimedSignal = preload ("res://lib/utils/timed_signal.gd")
signal on_connect()
signal on_disconnect()
signal on_packed_received(packet: Dictionary)
signal _try_connect(success: bool)
const LOG_MESSAGES := false
var socket := WebSocketPeer.new()
var packet_callbacks := CallbackMap.new()
var api: HASS_API
var auth: Auth
var request_timeout := 10.0 # in seconds
var connection_timeout := 10.0 # in seconds
var connecting := false
var connected := false
var url := ""
var id := 1
enum ConnectionError {
OK = 0,
INVALID_URL = 1,
CONNECTION_FAILED = 2,
TIMEOUT = 3,
INVALID_TOKEN = 4
}
func _init(api: HASS_API):
self.api = api
auth = Auth.new(self)
add_child(auth)
# https://github.com/godotengine/godot/issues/84423
# Otherwise the WebSocketPeer will crash when receiving large packets
socket.set_inbound_buffer_size(pow(2, 23)) # ~8MB buffer
func start(url: String, token: String) -> ConnectionError:
if url == "":
return ConnectionError.INVALID_URL
if socket.get_ready_state() != WebSocketPeer.STATE_CLOSED:
socket.close()
if connecting or connected:
return ConnectionError.OK
connecting = true
print("Connecting to %s" % url + "/api/websocket")
var error = socket.connect_to_url(url + "/api/websocket")
if error != OK:
print("Error connecting to %s: %s" % [url, error])
return ConnectionError.CONNECTION_FAILED
set_process(true)
error = await TimedSignal.timed_signal(self, _try_connect, connection_timeout)
if error == Error.ERR_TIMEOUT:
print("Failed to connect to %s: Exceeded %ss" % [url, connection_timeout])
return ConnectionError.TIMEOUT
error = await auth.authenticate(token)
if error == Auth.AuthError.TIMEOUT:
return ConnectionError.TIMEOUT
elif error != Auth.AuthError.OK:
return ConnectionError.INVALID_TOKEN
connected = true
on_connect.emit()
return ConnectionError.OK
func _process(_delta):
socket.poll()
var state = socket.get_ready_state()
if state == WebSocketPeer.STATE_OPEN:
if connecting:
connecting = false
_try_connect.emit(Error.OK)
while socket.get_available_packet_count():
var packet = _decode_packet(socket.get_packet())
if typeof(packet) == TYPE_DICTIONARY:
handle_packet(packet)
elif typeof(packet) == TYPE_ARRAY:
for p in packet:
handle_packet(p)
elif state == WebSocketPeer.STATE_CLOSING:
pass
elif state == WebSocketPeer.STATE_CLOSED:
var code = socket.get_close_code()
var reason = socket.get_close_reason()
if reason == "":
reason = "Invalid URL"
print("WS connection closed with code: %s, reason: %s" % [code, reason])
set_process(false)
connecting = false
connected = false
on_disconnect.emit()
func handle_packet(packet: Dictionary):
if LOG_MESSAGES: print("Received packet: %s" % str(packet).substr(0, 1000))
on_packed_received.emit(packet)
if packet.has("id"):
packet_callbacks.call_key(int(packet.id), [packet])
func send_subscribe_packet(packet: Dictionary, callback: Callable):
packet.id = id
id += 1
packet_callbacks.add(packet.id, callback)
send_packet(packet)
return func():
packet_callbacks.remove(packet.id, callback)
send_packet({
id: id,
"type": packet.type.replace("subscribe", "unsubscribe"),
"subscription": packet.id
})
id += 1
func send_request_packet(packet: Dictionary, ignore_initial:=false):
packet.id = id
id += 1
var promise = Promise.new(func(resolve: Callable, reject: Callable):
var fn: Callable
if ignore_initial:
fn=func(packet: Dictionary):
if packet.type == "event":
resolve.call(packet)
packet_callbacks.remove(packet.id, fn)
packet_callbacks.add(packet.id, fn)
else:
packet_callbacks.add_once(packet.id, resolve)
var timeout=get_tree().create_timer(request_timeout)
timeout.timeout.connect(func():
reject.call(Promise.Rejection.new("Request timed out"))
if ignore_initial:
packet_callbacks.remove(packet.id, fn)
else:
packet_callbacks.remove(packet.id, resolve)
)
)
send_packet(packet)
return await promise.settled
func send_raw(packet: PackedByteArray):
if LOG_MESSAGES: print("Sending binary: %s" % packet.hex_encode())
socket.send(packet)
func send_packet(packet: Dictionary, with_id:=false):
if with_id:
packet.id = id
id += 1
if LOG_MESSAGES: print("Sending packet: %s" % _encode_packet(packet))
socket.send_text(_encode_packet(packet))
func _decode_packet(packet: PackedByteArray):
return JSON.parse_string(packet.get_string_from_utf8())
func _encode_packet(packet: Dictionary):
return JSON.stringify(packet)

View File

@ -33,11 +33,13 @@ var tts_sound = null:
func _init(hass: HASS_API):
self.api = hass
api.connection.on_packed_received.connect(handle_message)
func start_wakeword():
if pipe_running:
return
api.send_packet({
api.connection.send_packet({
"type": "assist_pipeline/run",
"start_stage": "wake_word",
"end_stage": "tts",

View File

@ -1,28 +0,0 @@
const HASS_API = preload ("../hass.gd")
signal on_authenticated()
var api: HASS_API
var url: String
var token: String
var authenticated := false
func _init(hass: HASS_API, url: String, token: String):
self.api = hass
self.url = url
self.token = token
func handle_message(message):
match message["type"]:
"auth_required":
api.send_packet({"type": "auth", "access_token": self.token})
"auth_ok":
authenticated = true
on_authenticated.emit()
"auth_invalid":
EventSystem.notify("Failed to authenticate with Home Assistant. Check your token and try again.", EventNotify.Type.DANGER)
api.handle_disconnect()
func on_disconnect():
authenticated = false

View File

@ -7,7 +7,7 @@ func _init(hass: HASS_API):
self.api = hass
func get_history(entity_id: String, start: String, end=null):
var meta_response = await api.send_request_packet({
var meta_response = await api.connection.send_request_packet({
"type": "recorder/get_statistics_metadata",
"statistic_ids": [
entity_id
@ -17,7 +17,7 @@ func get_history(entity_id: String, start: String, end=null):
if meta_response.status != OK:
return null
var data_response = await api.send_request_packet({
var data_response = await api.connection.send_request_packet({
"type": "recorder/statistics_during_period",
"start_time": start,
"statistic_ids": [

View File

@ -5,9 +5,10 @@ var integration_exists: bool = false
func _init(hass: HASS_API):
self.api = hass
test_integration.call_deferred()
func on_connect():
var response = await api.send_request_packet({
func test_integration():
var response = await api.connection.send_request_packet({
"type": "immersive_home/register",
"device_id": OS.get_unique_id(),
"name": OS.get_model_name(),
@ -16,4 +17,4 @@ func on_connect():
})
if response.status == Promise.Status.RESOLVED:
integration_exists = true
integration_exists = true

View File

@ -1,100 +1,47 @@
extends Node
const AuthHandler = preload ("./handlers/auth.gd")
const IntegrationHandler = preload ("./handlers/integration.gd")
const AssistHandler = preload ("./handlers/assist.gd")
const HistoryHandler = preload ("./handlers/history.gd")
const Connection = preload ("./connection.gd")
signal on_connect()
signal on_disconnect()
var connected := false
var devices_template := FileAccess.get_file_as_string("res://lib/home_apis/hass_ws/templates/devices.j2")
var socket := WebSocketPeer.new()
# in seconds
var request_timeout := 10.0
# var url := "wss://8ybjhqcinfcdyvzu.myfritz.net:8123/api/websocket"
var url := ""
var token := ""
var LOG_MESSAGES := false
var id := 1
var entities: Dictionary = {}
var entitiy_callbacks := CallbackMap.new()
var packet_callbacks := CallbackMap.new()
var auth_handler: AuthHandler
var connection: Connection
var integration_handler: IntegrationHandler
var assist_handler: AssistHandler
var history_handler: HistoryHandler
func _init(url:=self.url, token:=self.token):
self.url = url
self.token = token
func _init(url: String, token: String):
connection = Connection.new(self)
add_child(connection)
connection.on_disconnect.connect(func():
on_disconnect.emit()
)
var error = await connection.start(url, token)
if error != Connection.ConnectionError.OK:
print("Error starting connection: ", error)
return
auth_handler = AuthHandler.new(self, url, token)
integration_handler = IntegrationHandler.new(self)
assist_handler = AssistHandler.new(self)
history_handler = HistoryHandler.new(self)
start_subscriptions()
devices_template = devices_template.replace("\n", " ").replace("\t", "").replace("\r", " ")
connect_ws()
auth_handler.on_authenticated.connect(func():
start_subscriptions()
)
func connect_ws():
if url == ""||token == "":
return
print("Connecting to %s" % url + "/api/websocket")
socket.connect_to_url(url + "/api/websocket")
set_process(true)
# https://github.com/godotengine/godot/issues/84423
# Otherwise the WebSocketPeer will crash when receiving large packets
socket.set_inbound_buffer_size(pow(2, 23)) # ~8MB buffer
func _process(delta):
socket.poll()
var state = socket.get_ready_state()
if state == WebSocketPeer.STATE_OPEN:
while socket.get_available_packet_count():
var packet = decode_packet(socket.get_packet())
if typeof(packet) == TYPE_DICTIONARY:
handle_packet(packet)
elif typeof(packet) == TYPE_ARRAY:
for p in packet:
handle_packet(p)
elif state == WebSocketPeer.STATE_CLOSING:
pass
elif state == WebSocketPeer.STATE_CLOSED:
var code = socket.get_close_code()
var reason = socket.get_close_reason()
if reason == "":
reason = "Invalid URL"
var message = "WS connection closed with code: %s, reason: %s" % [code, reason]
EventSystem.notify(message, EventNotify.Type.DANGER)
print(message)
handle_disconnect()
func handle_packet(packet: Dictionary):
if LOG_MESSAGES: print("Received packet: %s" % str(packet).substr(0, 1000))
auth_handler.handle_message(packet)
assist_handler.handle_message(packet)
if packet.has("id"):
packet_callbacks.call_key(int(packet.id), [packet])
func start_subscriptions():
send_subscribe_packet({
connection.send_subscribe_packet({
"type": "subscribe_entities"
}, func(packet: Dictionary):
if packet.type != "event":
@ -107,7 +54,7 @@ func start_subscriptions():
"attributes": packet.event.a[entity]["a"]
}
entitiy_callbacks.call_key(entity, [entities[entity]])
handle_connect()
on_connect.emit()
if packet.event.has("c"):
for entity in packet.event.c.keys():
@ -122,90 +69,11 @@ func start_subscriptions():
entitiy_callbacks.call_key(entity, [entities[entity]])
)
func handle_connect():
integration_handler.on_connect()
connected = true
on_connect.emit()
func handle_disconnect():
auth_handler.on_disconnect()
set_process(false)
on_disconnect.emit()
func send_subscribe_packet(packet: Dictionary, callback: Callable):
packet.id = id
id += 1
packet_callbacks.add(packet.id, callback)
send_packet(packet)
return func():
packet_callbacks.remove(packet.id, callback)
send_packet({
id: id,
"type": packet.type.replace("subscribe", "unsubscribe"),
"subscription": packet.id
})
id += 1
func send_request_packet(packet: Dictionary, ignore_initial:=false):
packet.id = id
id += 1
var promise = Promise.new(func(resolve: Callable, reject: Callable):
var fn: Callable
if ignore_initial:
fn=func(packet: Dictionary):
if packet.type == "event":
resolve.call(packet)
packet_callbacks.remove(packet.id, fn)
packet_callbacks.add(packet.id, fn)
else:
packet_callbacks.add_once(packet.id, resolve)
var timeout=Timer.new()
timeout.set_wait_time(request_timeout)
timeout.set_one_shot(true)
timeout.timeout.connect(func():
reject.call(Promise.Rejection.new("Request timed out"))
if ignore_initial:
packet_callbacks.remove(packet.id, fn)
else:
packet_callbacks.remove(packet.id, resolve)
)
add_child(timeout)
timeout.start()
)
send_packet(packet)
return await promise.settled
func send_raw(packet: PackedByteArray):
if LOG_MESSAGES: print("Sending binary: %s" % packet.hex_encode())
socket.send(packet)
func send_packet(packet: Dictionary, with_id:=false):
if with_id:
packet.id = id
id += 1
if LOG_MESSAGES: print("Sending packet: %s" % encode_packet(packet))
socket.send_text(encode_packet(packet))
func decode_packet(packet: PackedByteArray):
return JSON.parse_string(packet.get_string_from_utf8())
func encode_packet(packet: Dictionary):
return JSON.stringify(packet)
func has_connected():
return connected
return connection.connected
func get_devices():
var result = await send_request_packet({
var result = await connection.send_request_packet({
"type": "render_template",
"template": devices_template,
"timeout": 3,
@ -270,7 +138,7 @@ func set_state(entity: String, state: Variant, attributes: Dictionary={}):
if service == null:
return null
return await send_request_packet({
return await connection.send_request_packet({
"type": "call_service",
"domain": domain,
"service": service,
@ -284,7 +152,7 @@ func has_integration():
return integration_handler.integration_exists
func update_room(room: String):
var response = await send_request_packet({
var response = await connection.send_request_packet({
"type": "immersive_home/update",
"device_id": OS.get_unique_id(),
"room": room

View File

@ -0,0 +1,14 @@
static func timed_signal(target: Node, target_signal: Signal, timeout: int):
var promise = Promise.new(func(resolve, reject):
var timer=target.get_tree().create_timer(timeout)
timer.timeout.connect(func():
resolve.call(Error.ERR_TIMEOUT)
)
target_signal.connect(func(result):
resolve.call(result))
)
var result = await promise.settled
return result.payload