Python Sample Exploitation ETL
This sample shows the steps needed to implement a file exploitation ETL in Truxton. You can see this same sample implemented in C. Add your ETL to the Truxton Service for it to automatically start with the other ETL processes.
Sample File Format
This sample will exploit a fake file format we call Acme. Acme Corporation is a known supplier of nefarious devices and explosives. Their file format begins with a five byte magic value followed by eleven bytes in a data structure. If the sixth byte in the file is 0x11 then it is a serial number file that uniquely identifies the user.
0000h: 88 77 66 55 00 11 22 33 44 55 66 77 88 99 AA BB 0010h: CC
Source Code
1 import sys
2 sys.path.append('C:/Program Files/Truxton/SDK')
3 import truxton
4
5 def main() -> None:
6
7 etl = truxton.etl()
8 etl.name = "Acme Exploitation"
9 etl.description = "This exploits Acme Corporation data files"
10 etl.queue = "wiley"
11 etl.stage = 40
12 etl.addtype(11111)
13
14 message = etl.getmessage()
15
16 while message is not None:
17 with message.file() as file_in_truxton:
18
19 file_in_truxton.seek(5)
20
21 next_byte = file_in_truxton.read(1)
22 if next_byte[0] == 0x11:
23 # Serial Number. The next 8 bytes are a serial number
24 file_in_truxton.seek(6)
25 serial_number = file_in_truxton.read(8)
26
27 artifact = file_in_truxton.newartifact()
28 artifact.type = truxton.ENTITY_TYPE_SERIAL_NUMBER
29 artifact.value = serial_number.hex()
30 artifact.datatype = truxton.DATA_TYPE_uint8_t
31 artifact.offset = 6
32 artifact.length = 8
33 artifact.save()
34
35 # Pause here until we get another message from the "wiley" message queue
36 message = etl.getmessage()
37
38 return None
39
40 if __name__ == "__main__":
41 sys.exit(main())
Code Walkthrough
Lines 7-12 setup the ETL. The message queue name will be "wiley", we are an early stage and want to receive Acme files (11111 was chosen as the identifier of Acme files).
Line 14 starts the ETL logic and waits until a message arrives on the "wiley" queue.
Line 17 opens the file so we can read from it.
Lines 21-22 read the sixth byte in the file and checks it for validity.
Lines 27-33 creates an artifact (which will be stored in the [Entity]
table in the database) and saves it to Truxton.
Line 28 sets the type of artifact to a serial number. This allows analysts to quickly find items of interest by their type.
Line 30 stores the format of how the serial number was stored in the file.
Line 33 saves the data to Truxton.
It will create a record in the [Entity]
table in the database.
Saving the artifact will cause Truxton to route it to any ETL that has subscribed to Type_Artifact
messages.
Line 36 pauses your ETL until a new message arrives on it queue.
Development and Debugging
Truxton ETLs assume they are part of a processing stream instead of processing a file from the local system. This can slow your development cycle down. Here's one strategy that will make your development iterations quicker:
- Perform a load with your sample file in it
- Find the file using the desktop GUI, copy the file identifier.
- Stop the Truxton service. This will prevent a "real" ETL from grabbing your file.
- Modify your program to use the
sendmefileid()
function. - Now every time you run your program you will immediately receive that file to play with.
import sys
sys.path.append('C:/Program Files/Truxton/SDK')
import truxton
def main() -> None:
etl = truxton.etl()
etl.name = "Acme Exploitation"
etl.description = "This exploits Acme Corporation data files"
etl.queue = "wiley"
etl.stage = 40
etl.addtype(11000)
etl.sendmefileid("5f06ef4d-03dd-5258-2758-378e00000011")
message = etl.getmessage()
while message is not None:
with message.file() as file_in_truxton:
file_in_truxton.seek(5)
next_byte = file_in_truxton.read(1)
if next_byte[0] == 0x11:
# Serial Number. The next 8 bytes are a serial number
file_in_truxton.seek(6)
serial_number = file_in_truxton.read(8)
artifact = file_in_truxton.newartifact()
artifact.type = truxton.ENTITY_TYPE_SERIAL_NUMBER
artifact.value = serial_number.hex()
artifact.datatype = truxton.DATA_TYPE_uint8_t
artifact.offset = 6
artifact.length = 8
artifact.save()
# Pause here until we get another message from the "wiley" message queue
message = etl.getmessage()
return None
if __name__ == "__main__":
sys.exit(main())