mirror of
https://github.com/elseif/MikroTikPatch.git
synced 2025-01-23 21:44:59 +03:00
0ff0485a87
modified: npk.py modified: patch.py
359 lines
16 KiB
Python
359 lines
16 KiB
Python
|
|
import struct,zlib
|
|
import argparse,os
|
|
from datetime import datetime
|
|
from dataclasses import dataclass
|
|
from enum import IntEnum
|
|
class NpkPartID(IntEnum):
|
|
NAME_INFO =0x01 # Package information: name, ver, etc.
|
|
DESCRIPTION =0x02 # Package description
|
|
DEPENDENCIES =0x03 # Package Dependencies
|
|
FILE_CONTAINER =0x04 # Files container zlib 1.2.3
|
|
INSTALL_SCRIPT =0x07 # Install script
|
|
UNINSTALL_SCRIPT =0x08 # Uninstall script
|
|
SIGNATURE =0x09 # Package signature
|
|
ARCHITECTURE =0x10 # Package architecture (e.g. i386)
|
|
PKG_CONFLICTS =0x11 # Package conflicts
|
|
PKG_INFO =0x12
|
|
FEATURES =0x13
|
|
PKG_FEATURES =0x14
|
|
SQUASHFS =0x15 # SquashFS
|
|
NULL_BLOCK =0X16
|
|
GIT_COMMIT =0x17 # Git commit
|
|
CHANNEL =0x18 # Release type (e.g. stable, testing, etc.)
|
|
HEADER =0x19
|
|
|
|
@dataclass
|
|
class NpkPartItem:
|
|
id: NpkPartID
|
|
data: bytes|object
|
|
|
|
class NpkInfo:
|
|
_format = '<16s4sI8s'
|
|
def __init__(self,name:str,version:str,build_time=datetime.now(),unknow=b'\x00'*8):
|
|
self._name = name[:16].encode().ljust(16,b'\x00')
|
|
self._version = self.encode_version(version)
|
|
self._build_time = int(build_time.timestamp())
|
|
self._unknow = unknow
|
|
def serialize(self)->bytes:
|
|
return struct.pack(self._format, self._name,self._version,self._build_time,self._unknow)
|
|
@staticmethod
|
|
def unserialize_from(data:bytes)->'NpkInfo':
|
|
assert len(data) == struct.calcsize(NpkInfo._format),'Invalid data length'
|
|
_name, _version,_build_time,unknow= struct.unpack_from(NpkInfo._format,data)
|
|
return NpkInfo(_name.decode(),NpkInfo.decode_version(_version),datetime.fromtimestamp(_build_time),unknow)
|
|
def __len__ (self)->int:
|
|
return struct.calcsize(self._format)
|
|
@property
|
|
def name(self)->str:
|
|
return self._name.decode().strip('\x00')
|
|
@name.setter
|
|
def name(self,value:str):
|
|
self._name = value[:16].encode().ljust(16,b'\x00')
|
|
@staticmethod
|
|
def decode_version(value:bytes):
|
|
revision,build,minor,major = struct.unpack_from('4B',value)
|
|
if build == 97:
|
|
build = 'alpha'
|
|
elif build == 98:
|
|
build = 'beta'
|
|
elif build == 99:
|
|
build = 'rc'
|
|
elif build == 102:
|
|
if revision & 0x80:
|
|
build = 'test'
|
|
revision &= 0x7f
|
|
else:
|
|
build = 'final'
|
|
else:
|
|
build = 'unknown'
|
|
return f'{major}.{minor}.{revision}.{build}'
|
|
@staticmethod
|
|
def encode_version(value:str):
|
|
s = value.split('.')
|
|
if 4 != len(s) and s[3] in [ 'alpha', 'beta', 'rc','final', 'test']:
|
|
raise ValueError('Invalid version string')
|
|
major = int(s[0])
|
|
minor = int(s[1])
|
|
revision = int(s[2])
|
|
if s[3] == 'alpha':
|
|
build = 97
|
|
elif s[3] == 'beta':
|
|
build = 98
|
|
elif s[3] == 'rc':
|
|
build = 99
|
|
elif s[3] == 'final':
|
|
build = 102
|
|
revision &= 0x7f
|
|
else: #'test'
|
|
build = 102
|
|
revision |= 0x80
|
|
return struct.pack('4B',revision,build,minor,major)
|
|
@property
|
|
def version(self)->str:
|
|
return self.decode_version(self._version)
|
|
@version.setter
|
|
def version(self,value:str = '7.15.1.final'):
|
|
self._version = self.encode_version(value)
|
|
@property
|
|
def build_time(self):
|
|
return datetime.fromtimestamp(self._build_time)
|
|
@build_time.setter
|
|
def build_time(self,value:datetime):
|
|
self._build_time = int(value.timestamp())
|
|
|
|
class NpkNameInfo(NpkInfo):
|
|
_format = '<16s4sI12s'
|
|
def __init__(self,name:str,version:str,build_time=datetime.now(),unknow=b'\x00'*12):
|
|
self._name = name[:16].encode().ljust(16,b'\x00')
|
|
self._version = self.encode_version(version)
|
|
self._build_time = int(build_time.timestamp())
|
|
self._unknow = unknow
|
|
def serialize(self)->bytes:
|
|
return struct.pack(self._format, self._name,self._version,self._build_time,self._unknow)
|
|
@staticmethod
|
|
def unserialize_from(data:bytes)->'NpkNameInfo':
|
|
assert len(data) == struct.calcsize(NpkNameInfo._format),'Invalid data length'
|
|
_name, _version,_build_time,_unknow = struct.unpack_from(NpkNameInfo._format,data)
|
|
return NpkNameInfo(_name.decode(),NpkNameInfo.decode_version(_version),datetime.fromtimestamp(_build_time),_unknow)
|
|
|
|
class NpkFileContainer:
|
|
_format = '<BB6sIBBBBIIIH'
|
|
@dataclass
|
|
class NpkFileItem:
|
|
perm: int
|
|
type: int
|
|
usr_or_grp: int
|
|
modify_time: int
|
|
revision: int
|
|
rc: int
|
|
minor: int
|
|
major: int
|
|
create_time: int
|
|
unknow: int
|
|
name: bytes
|
|
data: bytes
|
|
def __init__(self,items:list['NpkFileContainer.NpkFileItem']=None):
|
|
self._items= items
|
|
def serialize(self)->bytes:
|
|
compressed_data = b''
|
|
compressor = zlib.compressobj()
|
|
for item in self._items:
|
|
data = struct.pack(self._format, item.perm,item.type,item.usr_or_grp, item.modify_time,item.revision,item.rc,item.minor,item.major,item.create_time,item.unknow,len(item.data),len(item.name))
|
|
data += item.name + item.data
|
|
compressed_data += compressor.compress(data)
|
|
return compressed_data + compressor.flush()
|
|
@staticmethod
|
|
def unserialize_from(data:bytes):
|
|
items:list['NpkFileContainer.NpkFileItem'] = []
|
|
decompressed_data = zlib.decompress(data)
|
|
while len(decompressed_data):
|
|
offset = struct.calcsize(NpkFileContainer._format)
|
|
perm,type,usr_or_grp, modify_time,revision,rc,minor,major,create_time,unknow,data_size,name_size= struct.unpack_from(NpkFileContainer._format, decompressed_data)
|
|
name = decompressed_data[offset:offset+name_size]
|
|
data = decompressed_data[offset+name_size:offset+name_size+data_size]
|
|
items.append(NpkFileContainer.NpkFileItem(perm,type,usr_or_grp, modify_time,revision,rc,minor,major,create_time,unknow,name,data))
|
|
decompressed_data = decompressed_data[offset+name_size+data_size:]
|
|
return NpkFileContainer(items)
|
|
|
|
def __len__ (self)->int:
|
|
return len(self.serialize())
|
|
def __getitem__(self,index:int)->'NpkFileContainer.NpkFileItem':
|
|
return self._items[index]
|
|
def __iter__(self):
|
|
for item in self._items:
|
|
yield item
|
|
|
|
class Package:
|
|
def __init__(self) -> None:
|
|
self._parts:list[NpkPartItem] = []
|
|
def __iter__(self):
|
|
for part in self._parts:
|
|
yield part
|
|
def __getitem__(self, id:NpkPartID):
|
|
for part in self._parts:
|
|
if part.id == id:
|
|
return part
|
|
part = NpkPartItem(id,b'')
|
|
self._parts.append(part)
|
|
return part
|
|
|
|
class NovaPackage(Package):
|
|
NPK_MAGIC = 0xbad0f11e
|
|
def __init__(self,data:bytes=b''):
|
|
super().__init__()
|
|
self._packages:list[Package] = []
|
|
offset = 0
|
|
self._has_pkg = False
|
|
while offset < len(data):
|
|
part_id,part_size = struct.unpack_from('<HI',data,offset)
|
|
offset += 6
|
|
part_data = data[offset:offset+part_size]
|
|
offset += part_size
|
|
if part_id == NpkPartID.PKG_FEATURES:
|
|
self._has_pkg = True
|
|
self._parts.append(NpkPartItem(NpkPartID(part_id),part_data))
|
|
continue
|
|
if self._has_pkg:
|
|
if part_id == NpkPartID.NAME_INFO:
|
|
self._packages.append(Package())
|
|
self._packages[-1]._parts.append(NpkPartItem(NpkPartID(part_id),NpkNameInfo.unserialize_from(part_data)))
|
|
else:
|
|
self._packages[-1]._parts.append(NpkPartItem(NpkPartID(part_id),part_data))
|
|
else:
|
|
if part_id == NpkPartID.NAME_INFO:
|
|
self._parts.append(NpkPartItem(NpkPartID(part_id),NpkNameInfo.unserialize_from(part_data)))
|
|
elif part_id == NpkPartID.PKG_INFO:
|
|
self._parts.append(NpkPartItem(NpkPartID(part_id),NpkInfo.unserialize_from(part_data)))
|
|
else:
|
|
self._parts.append(NpkPartItem(NpkPartID(part_id),part_data))
|
|
|
|
def get_digest(self,hash_fnc,package:Package=None)->bytes:
|
|
parts = package._parts if package else self._parts
|
|
for part in parts:
|
|
data_header = struct.pack('<HI',part.id.value,len(part.data))
|
|
if part.id == NpkPartID.HEADER:
|
|
continue
|
|
else:
|
|
hash_fnc.update(data_header)
|
|
if part.id == NpkPartID.SIGNATURE:
|
|
break
|
|
elif part.data:
|
|
if isinstance(part.data,bytes):
|
|
hash_fnc.update(part.data)
|
|
else:
|
|
hash_fnc.update(part.data.serialize())
|
|
return hash_fnc.digest()
|
|
|
|
def sign(self,kcdsa_private_key:bytes,eddsa_private_key:bytes):
|
|
import hashlib
|
|
from mikro import mikro_kcdsa_sign,mikro_eddsa_sign
|
|
build_time = os.environ['BUILD_TIME'] if 'BUILD_TIME' in os.environ else None
|
|
if len(self._packages) > 0:
|
|
if build_time:
|
|
self[NpkPartID.PKG_INFO].data._build_time = int(build_time)
|
|
for package in self._packages:
|
|
if len(package[NpkPartID.SIGNATURE].data) != 20+48+64:
|
|
package[NpkPartID.SIGNATURE].data = b'\0'*(20+48+64)
|
|
if build_time:
|
|
package[NpkPartID.NAME_INFO].data._build_time = int(build_time)
|
|
sha1_digest = self.get_digest(hashlib.new('SHA1'),package)
|
|
sha256_digest = self.get_digest(hashlib.new('SHA256'),package)
|
|
kcdsa_signature = mikro_kcdsa_sign(sha256_digest[:20],kcdsa_private_key)
|
|
eddsa_signature = mikro_eddsa_sign(sha256_digest,eddsa_private_key)
|
|
package[NpkPartID.SIGNATURE].data = sha1_digest + kcdsa_signature + eddsa_signature
|
|
else:
|
|
if len(self[NpkPartID.SIGNATURE].data) != 20+48+64:
|
|
self[NpkPartID.SIGNATURE].data = b'\0'*(20+48+64)
|
|
if build_time:
|
|
self[NpkPartID.NAME_INFO].data._build_time = int(build_time)
|
|
sha1_digest = self.get_digest(hashlib.new('SHA1'))
|
|
sha256_digest = self.get_digest(hashlib.new('SHA256'))
|
|
kcdsa_signature = mikro_kcdsa_sign(sha256_digest[:20],kcdsa_private_key)
|
|
eddsa_signature = mikro_eddsa_sign(sha256_digest,eddsa_private_key)
|
|
self[NpkPartID.SIGNATURE].data = sha1_digest + kcdsa_signature + eddsa_signature
|
|
|
|
def verify(self,kcdsa_public_key:bytes,eddsa_public_key:bytes):
|
|
import hashlib
|
|
from mikro import mikro_kcdsa_verify,mikro_eddsa_verify
|
|
if len(self._packages) > 0:
|
|
for package in self._packages:
|
|
sha1_digest = self.get_digest(hashlib.new('SHA1'),package)
|
|
sha256_digest = self.get_digest(hashlib.new('SHA256'),package)
|
|
signature = package[NpkPartID.SIGNATURE].data
|
|
if sha1_digest != signature[:20]:
|
|
return False
|
|
if not mikro_kcdsa_verify(sha256_digest[:20],signature[20:68],kcdsa_public_key):
|
|
return False
|
|
if not mikro_eddsa_verify(sha256_digest,signature[68:132],eddsa_public_key):
|
|
return False
|
|
else:
|
|
sha1_digest = self.get_digest(hashlib.new('SHA1'))
|
|
sha256_digest = self.get_digest(hashlib.new('SHA256'))
|
|
signature = self[NpkPartID.SIGNATURE].data
|
|
if sha1_digest != signature[:20]:
|
|
return False
|
|
if not mikro_kcdsa_verify(sha256_digest[:20],signature[20:68],kcdsa_public_key):
|
|
return False
|
|
if not mikro_eddsa_verify(sha256_digest,signature[68:132],eddsa_public_key):
|
|
return False
|
|
|
|
return True
|
|
|
|
def save(self,file):
|
|
size = 0
|
|
for part in self._parts:
|
|
size += 6 + len(part.data)
|
|
for package in self._packages:
|
|
for part in package:
|
|
size += 6 + len(part.data)
|
|
with open(file,'wb') as f:
|
|
f.write(struct.pack('<II', NovaPackage.NPK_MAGIC, size))
|
|
for part in self._parts:
|
|
f.write(struct.pack('<HI',part.id.value ,len(part.data)))
|
|
if isinstance(part.data,bytes):
|
|
f.write(part.data)
|
|
else:
|
|
f.write(part.data.serialize())
|
|
for package in self._packages:
|
|
for part in package:
|
|
f.write(struct.pack('<HI',part.id.value ,len(part.data)))
|
|
if isinstance(part.data,bytes):
|
|
f.write(part.data)
|
|
else:
|
|
f.write(part.data.serialize())
|
|
|
|
@staticmethod
|
|
def load(file):
|
|
with open(file,'rb') as f:
|
|
data = f.read()
|
|
assert int.from_bytes(data[:4],'little') == NovaPackage.NPK_MAGIC, 'Invalid Nova Package Magic'
|
|
assert int.from_bytes(data[4:8],'little') == len(data) - 8, 'Invalid Nova Package Size'
|
|
return NovaPackage(data[8:])
|
|
|
|
if __name__=='__main__':
|
|
parser = argparse.ArgumentParser(description='nova package creator and editor')
|
|
subparsers = parser.add_subparsers(dest="command")
|
|
sign_parser = subparsers.add_parser('sign',help='sign npk file')
|
|
sign_parser.add_argument('input',type=str, help='Input file')
|
|
sign_parser.add_argument('output',type=str,help='Output file')
|
|
verify_parser = subparsers.add_parser('verify',help='Verify npk file')
|
|
verify_parser.add_argument('input',type=str, help='Input file')
|
|
create_option_parser = subparsers.add_parser('create',help='Create option.npk file')
|
|
create_option_parser.add_argument('input',type=str,help='From npk file')
|
|
create_option_parser.add_argument('output',type=str,help='Output file')
|
|
create_option_parser.add_argument('name',type=str,help='NPK name')
|
|
create_option_parser.add_argument('squashfs',type=str,help='NPK squashfs file')
|
|
create_option_parser.add_argument('-desc','--description',type=str,help='NPK description')
|
|
args = parser.parse_args()
|
|
kcdsa_private_key = bytes.fromhex(os.environ['CUSTOM_LICENSE_PRIVATE_KEY'])
|
|
eddsa_private_key = bytes.fromhex(os.environ['CUSTOM_NPK_SIGN_PRIVATE_KEY'])
|
|
kcdsa_public_key = bytes.fromhex(os.environ['CUSTOM_LICENSE_PUBLIC_KEY'])
|
|
eddsa_public_key = bytes.fromhex(os.environ['CUSTOM_NPK_SIGN_PUBLIC_KEY'])
|
|
|
|
if args.command =='sign':
|
|
print(f'Signing {args.input}')
|
|
npk = NovaPackage.load(args.input)
|
|
npk.sign(kcdsa_private_key,eddsa_private_key)
|
|
npk.save(args.output)
|
|
elif args.command == 'verify':
|
|
npk = NovaPackage.load(args.input)
|
|
print(f'Verifying {args.input} ',end="")
|
|
if npk.verify(kcdsa_public_key,eddsa_public_key):
|
|
print('Valid')
|
|
exit(0)
|
|
else:
|
|
print('Invalid')
|
|
exit(-1)
|
|
elif args.command =='create':
|
|
print(f'Creating {args.output} from {args.input}')
|
|
option_npk = NovaPackage.load(args.input)
|
|
option_npk[NpkPartID.NAME_INFO].data.name = args.name
|
|
option_npk[NpkPartID.DESCRIPTION].data = args.description.encode() if args.description else args.name.encode()
|
|
option_npk[NpkPartID.NULL_BLOCK].data = b''
|
|
option_npk[NpkPartID.SQUASHFS].data = open(args.squashfs,'rb').read()
|
|
option_npk.sign(kcdsa_private_key,eddsa_private_key)
|
|
option_npk.save(args.output)
|
|
print(f'Created {args.output}')
|
|
else:
|
|
parser.print_help() |