Python Sample Identification ETL
This sample shows the steps needed to implement a byte identifier ETL in Python. You can see this same sample implemented in C. After identifying the file type, you will next need to exploit it. Add your ETL to the Truxton Service for it to automatically start with the other ETL processes.
Sample File Format
This sample will identify a fake file format we call Acme. Acme Corporation is a known supplier of nefarious devices and explosives. Their file format begins with a five byte magic value followed by eleven bytes in a data structure.
0000h: 88 77 66 55 00 11 22 33 44 55 66 77 88 99 AA BB 0010h: CC
Source Code
1 import sys
2 sys.path.append('C:/Program Files/Truxton/SDK')
3 import truxton
4
5 # This is a File Identifier ETL
6 # An Acme File begins with 0x88 0x77 0x66 0x55 0x00 and is at least 16 bytes long
7
8 def main() -> None:
9
10 etl = truxton.etl()
11 etl.name = "Acme Identifier"
12 etl.description = "This ETL identifies files using the Acme method"
13 etl.queue = "acme"
14
15 # Pick an early stage
16 etl.stage = 2
17
18 # We are an identifier, the Loader attempts to identify files first, if it can't it
19 # will give them a type of Type_Unknown
20 # We will grab those files and run them through Acme algorithms
21 # Tell Truxton that we want files of unknown format
22 etl.addtype(truxton.Type_Unknown)
23
24 # Pause here until a message is received from our "acme" message queue
25 message = etl.getmessage()
26
27 while message is not None:
28 # We can actually use the message to shortcut the identification process
29 # If our file type begins with a fixed series of bytes we can check the first
30 # four bytes of the contents without having to open the file and read from it.
31 # The "signature" member of the message contains the first four bytes of the file.
32 if message.depotlength >= 16 and message.signature == 0x88776655:
33 # There are enough bytes to attempt identification and the first four bytes match
34 # We will now open the file contents
35 with message.file() as file_in_truxton:
36
37 # Since the first four bytes were already checked, let's read the fifth byte
38 file_in_truxton.seek(4)
39 next_byte = file_in_truxton.read(1)
40
41 if next_byte[0] == 0:
42 # Yes! This is an Acme file. Change the database record
43 file_in_truxton.changetype(11000)
44
45 # We changed the database, now we need to send this file to any ETL
46 # exploitation processes that registered to receive Acme files
47 # First, change the file type in the message
48 message.filetype = 11000
49
50 # Now send this message to those ETL processes
51 message.route()
52
53 # Pause here until we get another message from the "acme" message queue
54 message = etl.getmessage()
55
56 if __name__ == "__main__":
57 sys.exit(main())
Code Walkthrough
The above code shows how to create an ETL process, register for a particular type of file, receive messages from a message queue, read the contents of a file in Truxton, and send a message to other ETL processes.
Lines 10-22 setup the ETL.
The message queue name will be "acme", we are an early stage and want to receive Type_Unknown
files.
Line 25 starts the ETL logic and waits until a message arrives on the "acme" queue.
Line 32 looks at the data in the message to see if it is even possible for the file we have received to be an Acme file.
The signature
member contains the first four bytes of the file.
Acme's format has a five byte signature which means we will have to read bytes from the file in order to perform a valid check.
Opening a file is rather expensive and we want identification to be as fast as possible.
By using signature
to check the first four bytes, we can avoid unnecessarily incurring a performance hit by reading from a file we know can't possibly be Acme.
Line 35 gives you a read-only Python file object so you can read from it.
Lines 38-41 read the fifth byte in the file and checks it for validity.
Line 43 changes the file type to our identifier for Acme. We talked about file type identifiers in a previous article.
Line 48-51 begins the process of sending this newly identified file to any ETL process that has registered for it.
The first step is to overwrite the filetype
, which should contain Type_Unknown
, with the identifier for our file type.
The last step is to call route()
which tells Truxton to send this message to any ETLs that want it.
Line 54 completes the loop by getting another message from the acme message queue.
Source Code No Comments
import sys
sys.path.append('C:/Program Files/Truxton/SDK')
import truxton
def main():
etl = truxton.etl()
etl.name = "Acme Identifier"
etl.description = "This ETL identifies files using the Acme method"
etl.queue = "acme"
etl.stage = 2
etl.addtype(truxton.Type_Unknown)
message = etl.getmessage()
while message is not None:
if message.depotlength >= 16 and message.signature == 0x88776655:
with message.file() as file_in_truxton:
file_in_truxton.seek(4)
next_byte = file_in_truxton.read(1)
if next_byte[0] == 0:
file_in_truxton.changetype(11000)
message.filetype = 11000
message.route()
message = etl.getmessage()
if __name__ == "__main__":
sys.exit(main())