You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ORPA-pyOpenRPA/Robot/daemon.py

296 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from http.server import BaseHTTPRequestHandler, HTTPServer
import pdb
import pywinauto
import json
import subprocess
import zlib
import os
#ProcessChildSendString
def ProcessChildSendString(lProcess,lString):
lByteString = zlib.compress(lString.encode("utf-8"))
#Вернуть потенциальные \n
lByteString = lByteString.replace(b'\n',b'{{n}}')
#Отправить сообщение в дочерний процесс
lProcess.stdin.write(lByteString+bytes('\n',"utf-8"))
#print(str(lByteString+bytes('\n',"utf-8")))
lProcess.stdin.flush()
#Вернуть результат
return
#ProcessChildReadWaitString
def ProcessChildReadWaitString(lProcess):
#Ожидаем ответ от процесса
lResult = lProcess.stdout.readline()
#Обработка спец символов
#print(b'NewLine: '+lResult)
#Вернуть потенциальные \n
lResult = lResult.replace(b'{{{n}}}',b'\n')
#Вернуть \r
lResult = lResult.replace(b'{{{r}}}',b'\r')
#Вернуть \0
lResult = lResult.replace(b'{{{0}}}',b'\0')
#Вернуть \a
lResult = lResult.replace(b'{{{a}}}',b'\a')
#Вернуть \b
lResult = lResult.replace(b'{{{b}}}',b'\b')
#Вернуть \t
lResult = lResult.replace(b'{{{t}}}',b'\t')
#Вернуть \v
lResult = lResult.replace(b'{{{v}}}',b'\v')
#Вернуть \f
lResult = lResult.replace(b'{{{f}}}',b'\f')
#print("check")
#print(str(lResult))
lResult = zlib.decompress(lResult[0:-1])
lResult = lResult.decode("utf-8")
#Вернуть результат
return lResult
#ProcessChildSendObject
def ProcessChildSendObject(inProcess,inObject):
#Выполнить отправку сконвертированного объекта в JSON
ProcessChildSendString(inProcess,json.dumps(inObject))
return
#ProcessChildReadWaitObject
def ProcessChildReadWaitObject(inProcess):
#Выполнить получение и разбор объекта
lResult=json.loads(ProcessChildReadWaitString(inProcess));
return lResult;
#ProcessChildSendReadWaitString
def ProcessChildSendReadWaitString(lProcess,lString):
ProcessChildSendString(lProcess,lString)
#Вернуть результат
return ProcessChildReadWaitString(lProcess)
#ProcessChildSendReadWaitObject
def ProcessChildSendReadWaitObject(inProcess,inObject):
ProcessChildSendObject(inProcess,inObject)
#Вернуть результат
return ProcessChildReadWaitString(inProcess)
#pywinauto
def checkBitnessIs64():
lResult=True
return lResult;
def GetControl(inControlSpecificationArray):
#Выполнить идентификацию объектов, если передан массив
lResultList=[];
if len(inControlSpecificationArray) > 0:
#Выполнить подключение к объекту
lRPAApplication = pywinauto.Application()
#Проверка разрядности
lRPAApplication.connect(**inControlSpecificationArray[0])
lTempObject=lRPAApplication.window(**inControlSpecificationArray[0])
#Циклическое прохождение к недрам объекта
for lWindowSpecification in inControlSpecificationArray[1:]:
lTempObject=lTempObject.window(**lWindowSpecification)
#Получить инфо объект
lTempObjectInfo = lTempObject.wrapper_object().element_info
#Добавить информацию об обнаруженом объекте
lResultList.append({"name":lTempObjectInfo.name, "rich_text":lTempObjectInfo.rich_text, "process_id":lTempObjectInfo.process_id,"handle":lTempObjectInfo.handle});
return lResultList
def GetChildControls(inControlSpecificationArray):
#Выполнить идентификацию объектов, если передан массив
lResultList=[];
if len(inControlSpecificationArray) > 0:
#Выполнить подключение к объекту
lRPAApplication = pywinauto.Application()
lRPAApplication.connect(**inControlSpecificationArray[0])
lTempObject=lRPAApplication.window(**inControlSpecificationArray[0])
#Циклическое прохождение к недрам объекта
for lWindowSpecification in inControlSpecificationArray[1:]:
lTempObject=lTempObject.window(**lWindowSpecification)
#Получить инфо объект
lTempObjectInfo = lTempObject.wrapper_object().element_info
#Добавить информацию об обнаруженом объекте
lResultList.append({"name":lTempObjectInfo.name, "rich_text":lTempObjectInfo.rich_text, "process_id":lTempObjectInfo.process_id,"handle":lTempObjectInfo.handle});
return lResultList
def GetWindowList(inWindowSpecificationArray = []):
#Выполнить идентификацию объектов, если передан массив
lResultList={};
if len(inWindowSpecificationArray) > 0:
#Выполнить подключение к объекту
lRPAApplication = pywinauto.Application()
lTempObject=lRPAApplication.connect(**inWindowSpecificationArray[0])
#Циклическое прохождение к недрам объекта
for lWindowSpecification in inWindowSpecificationArray[1:]:
lTempObject=lTempObject.window(**lWindowSpecification)
lTempObjectInfo = lTempObject.wrapper_object().element_info
lResultList2.append({"name":lTempObjectInfo.name, "rich_text":lTempObjectInfo.rich_text, "process_id":lTempObjectInfo.process_id,"handle":lTempObjectInfo.handle});
#Получить список объектов
lResultList=pywinauto.findwindows.find_elements()
else:
#Получить список объектов
lResultList=pywinauto.findwindows.find_elements()
lResultList2=[]
for lI in lResultList:
lResultList2.append({"name":lI.name, "rich_text":lI.rich_text, "process_id":lI.process_id,"handle":lI.handle});
return lResultList2
# HTTPRequestHandler class
class testHTTPServer_RequestHandler(BaseHTTPRequestHandler):
#ResponseContentTypeFile
def SendResponseContentTypeFile(self,inContentType,inFilePath):
# Send response status code
self.send_response(200)
# Send headers
self.send_header('Content-type',inContentType)
self.end_headers()
lFileObject = open(inFilePath, "rb")
# Write content as utf-8 data
self.wfile.write(lFileObject.read())
#Закрыть файловый объект
lFileObject.close()
# GET
def do_GET(self):
#Мост между файлом и http запросом (новый формат)
if self.path == "/":
self.SendResponseContentTypeFile('text/html',"Index.xhtml")
#Мост между файлом и http запросом (новый формат)
if self.path == '/3rdParty/Semantic-UI-CSS-master/semantic.min.css':
self.SendResponseContentTypeFile('text/css',"3rdParty\\Semantic-UI-CSS-master\\semantic.min.css")
#Мост между файлом и http запросом (новый формат)
if self.path == '/3rdParty/Semantic-UI-CSS-master/semantic.min.js':
self.SendResponseContentTypeFile('application/javascript',"3rdParty\\Semantic-UI-CSS-master\\semantic.min.js")
#Мост между файлом и http запросом (новый формат)
if self.path == '/3rdParty/jQuery/jquery-3.1.1.min.js':
self.SendResponseContentTypeFile('application/javascript',"3rdParty\\jQuery\\jquery-3.1.1.min.js")
#Мост между файлом и http запросом (новый формат)
if self.path == '/3rdParty/Google/LatoItalic.css':
self.SendResponseContentTypeFile('font/css',"3rdParty\\Google\\LatoItalic.css")
#Мост между файлом и http запросом (новый формат)
if self.path == '/3rdParty/Semantic-UI-CSS-master/themes/default/assets/fonts/icons.woff2':
self.SendResponseContentTypeFile('font/woff2',"3rdParty\\Semantic-UI-CSS-master\\themes\\default\\assets\\fonts\\icons.woff2")
#Мост между файлом и http запросом (новый формат)
if self.path == '/favicon.ico':
self.SendResponseContentTypeFile('image/x-icon',"favicon.ico")
#Action ObjectInspector GetObjectList
if self.path == '/ObjectDetector/JSONGetWindowList':
#ReadRequest
#lInputByteArray=self.rfile.read()
#print(str(len(os.stat(self.rfile).st_size)))
# Send response status code
self.send_response(200)
# Send headers
self.send_header('Content-type','application/json')
self.end_headers()
# Send message back to client
#{'functionName':'', 'argsArray':[]}
lRequestObject={'functionName':'ElementGetChildElementList','argsArray':[]}
#Отправить запрос в дочерний процесс, который отвечает за работу с Windows окнами
ProcessChildSendObject(p,lRequestObject)
#Получить ответ от дочернего процесса
lResponseObject=ProcessChildReadWaitObject(p)
message = json.dumps(lResponseObject)
# Write content as utf-8 data
self.wfile.write(bytes(message, "utf8"))
# POST
def do_POST(self):
#Action ObjectInspector GetObjectList
if self.path == '/ObjectDetector/JSONGetWindowListArgs':
#ReadRequest
lInputByteArrayLength = int(self.headers.get('Content-Length'))
lInputByteArray=self.rfile.read(lInputByteArrayLength)
#Превращение массива байт в объект
print(lInputByteArray.decode('utf8'))
lInputObject=json.loads(lInputByteArray.decode('utf8'))
# Send response status code
self.send_response(200)
# Send headers
self.send_header('Content-type','application/json')
self.end_headers()
# Send message back to client
#{'functionName':'', 'argsArray':[]}
lRequestObject={'functionName':'ElementGetChildElementList','argsArray':lInputObject}
#Отправить запрос в дочерний процесс, который отвечает за работу с Windows окнами
ProcessChildSendObject(p,lRequestObject)
#Получить ответ от дочернего процесса
lResponseObject=ProcessChildReadWaitObject(p)
print(str(lResponseObject))
message = json.dumps(lResponseObject)
# Write content as utf-8 data
self.wfile.write(bytes(message, "utf8"))
if self.path == '/GUIAction':
#ReadRequest
lInputByteArrayLength = int(self.headers.get('Content-Length'))
lInputByteArray=self.rfile.read(lInputByteArrayLength)
#Превращение массива байт в объект
lInputObject=json.loads(lInputByteArray.decode('utf8'))
# Send response status code
self.send_response(200)
# Send headers
self.send_header('Content-type','application/json')
self.end_headers()
# Send message back to client
#{'functionName':'', 'argsArray':[]}
#pdb.set_trace()
lRequestObject=lInputObject
#Отправить запрос в дочерний процесс, который отвечает за работу с Windows окнами
ProcessChildSendObject(p,lRequestObject)
#Получить ответ от дочернего процесса
lResponseObject=ProcessChildReadWaitObject(p)
message = json.dumps(lResponseObject)
# Write content as utf-8 data
self.wfile.write(bytes(message, "utf8"))
if self.path == '/GUIActionList':
#ReadRequest
lInputByteArrayLength = int(self.headers.get('Content-Length'))
lInputByteArray=self.rfile.read(lInputByteArrayLength)
#Превращение массива байт в объект
lInputObject=json.loads(lInputByteArray.decode('utf8'))
# Send response status code
self.send_response(200)
# Send headers
self.send_header('Content-type','application/json')
self.end_headers()
# Send message back to client
#{'functionName':'', 'argsArray':[]}
lRequestObject=lInputObject
lOutputObject=[]
#pdb.set_trace()
#Циклическая отправка запросов в дочерний объект
for lItem in lRequestObject:
#Отправить запрос в дочерний процесс, который отвечает за работу с Windows окнами
ProcessChildSendObject(p,lItem)
#Получить ответ от дочернего процесса
lResponseObject=ProcessChildReadWaitObject(p)
#Добавить в выходной массив
lOutputObject.append(lResponseObject)
#Сформировать текстовый ответ
message = json.dumps(lOutputObject)
# Write content as utf-8 data
self.wfile.write(bytes(message, "utf8"))
return
def run():
print('starting server...')
# Server settings
# Choose port 8080, for port 80, which is normally used for a http server, you need root access
server_address = ('127.0.0.1', 8081)
httpd = HTTPServer(server_address, testHTTPServer_RequestHandler)
print('running server...')
httpd.serve_forever()
#Start childprocess 32 bit
p = subprocess.Popen(['WPy32-3720\\python-3.7.2\\python.exe','winGUI.py'],stdin=subprocess.PIPE,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
#print(ChildProcessReadWaitString(p))
run()