1
0
mirror of https://github.com/ThunderEX/py-kms.git synced 2025-04-18 07:44:00 +03:00
py-kms/kmsRequestV5.py
2018-02-08 22:30:59 +08:00

134 lines
4.1 KiB
Python

import os
import binascii
import hashlib
import pyaes
from kmsBase import kmsRequestStruct, kmsResponseStruct, kmsBase
from structure import Structure
class kmsRequestV5(kmsBase):
class RequestV5(Structure):
class Message(Structure):
commonHdr = ()
structure = (
('salt', '16s'),
('encrypted', '240s'), #kmsRequestStruct
)
commonHdr = ()
structure = (
('bodyLength1', '<I=2 + 2 + len(message)'),
('bodyLength2', '<I=2 + 2 + len(message)'),
('versionMinor', '<H'),
('versionMajor', '<H'),
('message', ':', Message),
)
class ResponseV5(Structure):
commonHdr = ()
structure = (
('bodyLength1', '<I=2 + 2 + len(salt) + len(encrypted)'),
('unknown', '!I=0x00000200'),
('bodyLength2', '<I=2 + 2 + len(salt) + len(encrypted)'),
('versionMinor', '<H'),
('versionMajor', '<H'),
('salt', '16s'),
('encrypted', ':'), #DecryptedResponse
('padding', ':=bytearray(4 + (((~bodyLength1 & 3) + 1) & 3))'), # https://forums.mydigitallife.info/threads/71213-Source-C-KMS-Server-from-Microsoft-Toolkit?p=1277542&viewfull=1#post1277542
)
class DecryptedResponse(Structure):
commonHdr = ()
structure = (
('response', ':', kmsResponseStruct),
('keys', '16s'),
('hash', '32s'),
)
key = b'\xCD\x7E\x79\x6F\x2A\xB2\x5D\xCB\x55\xFF\xC8\xEF\x83\x64\xC4\x70'
v6 = False
ver = 5
def executeRequestLogic(self):
requestData = self.RequestV5(self.data)
decrypted = self.decryptRequest(requestData)
responseBuffer = self.serverLogic(decrypted)
iv, encrypted = self.encryptResponse(requestData, decrypted, responseBuffer)
return self.generateResponse(iv, encrypted, requestData)
def decryptRequest(self, request):
encrypted = request['message']['encrypted']
iv = request['message']['salt']
decrypter = pyaes.Decrypter(pyaes.AESModeOfOperationCBC(self.key, iv, v6=self.v6))
decrypted = decrypter.feed(encrypted) + decrypter.feed()
return kmsRequestStruct(decrypted)
def encryptResponse(self, request, decrypted, response):
randomSalt = bytearray(os.urandom(16))
result = hashlib.sha256(bytes(randomSalt)).digest()
iv = bytearray(request['message']['salt'])
XorSalts = pyaes.AES(self.key, v6=self.v6).decrypt(iv)
randomStuff = bytearray(16)
for i in range(0,16):
randomStuff[i] = (bytearray(XorSalts)[i] ^ randomSalt[i]) & 0xff
responsedata = self.DecryptedResponse()
responsedata['response'] = response
responsedata['keys'] = bytes(randomStuff)
responsedata['hash'] = result
encrypter = pyaes.Encrypter(pyaes.AESModeOfOperationCBC(self.key, iv, v6=self.v6))
crypted = encrypter.feed(responsedata.__bytes__()) + encrypter.feed()
return bytes(iv), crypted
def decryptResponse(self, response):
paddingLength = len(response.packField('padding'))
iv = response['salt']
encrypted = response['encrypted'][:-paddingLength]
decrypter = pyaes.Decrypter(pyaes.AESModeOfOperationCBC(self.key, iv, v6=self.v6))
decrypted = decrypter.feed(encrypted) + decrypter.feed()
return self.DecryptedResponse(decrypted)
def generateResponse(self, iv, encryptedResponse, requestData):
response = self.ResponseV5()
response['versionMinor'] = requestData['versionMinor']
response['versionMajor'] = requestData['versionMajor']
response['salt'] = iv
response['encrypted'] = encryptedResponse
if self.config['debug']:
print("KMS V%d Response: %s" % (self.ver, response.dump()))
print("KMS V%d Structue Bytes: %s" % (self.ver, binascii.b2a_hex(response.__bytes__())))
return response
def generateRequest(self, requestBase):
salt = os.urandom(16)
message = self.RequestV5.Message()
message['salt'] = salt
encrypter = pyaes.Encrypter(pyaes.AESModeOfOperationCBC(self.key, salt, v6=self.v6))
message['encrypted'] = encrypter.feed(requestBase) + encrypter.feed()
request = self.RequestV5()
request['versionMinor'] = requestBase['versionMinor']
request['versionMajor'] = requestBase['versionMajor']
request['message'] = message
if self.config['debug']:
print("Request V%d Data: %s" % (self.ver, request.dump()))
print("Request V%d: %s" % (self.ver, binascii.b2a_hex(bytes(request))))
return request