mirror of
https://github.com/hydralauncher/hydra.git
synced 2025-01-23 21:44:55 +03:00
36 lines
1.1 KiB
Python
36 lines
1.1 KiB
Python
import platform
|
|
|
|
class Fifo:
|
|
socket_handle = None
|
|
|
|
def __init__(self, path: str):
|
|
if platform.system() == "Windows":
|
|
import win32file
|
|
|
|
self.socket_handle = win32file.CreateFile(path, win32file.GENERIC_READ | win32file.GENERIC_WRITE,
|
|
0, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None)
|
|
else:
|
|
import socket
|
|
self.socket_handle = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
self.socket_handle.connect(path)
|
|
|
|
def recv(self, bufSize: int):
|
|
if platform.system() == "Windows":
|
|
import win32file
|
|
|
|
result, data = win32file.ReadFile(self.socket_handle, bufSize)
|
|
return data
|
|
else:
|
|
return self.socket_handle.recv(bufSize)
|
|
|
|
def send_message(self, msg: str):
|
|
buffer = bytearray(1024 * 2)
|
|
buffer[:len(msg)] = bytes(msg, "utf-8")
|
|
|
|
if platform.system() == "Windows":
|
|
import win32file
|
|
|
|
win32file.WriteFile(self.socket_handle, buffer)
|
|
else:
|
|
self.socket_handle.send(buffer)
|