GenomicDataStream
A scalable interface between data and analysis
Loading...
Searching...
No Matches
pgenstream.h
Go to the documentation of this file.
1/***********************************************************************
2 * @file pgenstream.h
3 * @author Gabriel Hoffman
4 * @email gabriel.hoffman@mssm.edu
5 * @brief reads a plink2/PGEN into matrix in chunks, storing variants in columns
6 * Copyright (C) 2024 Gabriel Hoffman
7 ***********************************************************************/
8
9
10#ifndef PGEN_STREAM_H_
11#define PGEN_STREAM_H_
12
13#ifndef DISABLE_EIGEN
14#include <Eigen/Sparse>
15#endif
16
17#include <string>
18#include <regex>
19#include <unordered_map>
20#include <algorithm>
21
22#include "VariantInfo.h"
24#include "GenomicRanges.h"
25#include "DataTable.h"
26#include "pgen/RPgenReader.h"
27#include "VariantSet.h"
28#include "utils.h"
29
30using namespace std;
31using namespace arma;
32
33namespace gds {
34
38class pgenstream :
39 public GenomicDataStream {
40 public:
41
43
47
48 genoFileType = getFileType(param.file);
49
50 // Parse PVAR/BIM file of variant positions and IDs
51 // evaluate subsetting of variants
52 process_variants();
53
54 if( genoFileType == PGEN ){
55 // Read index file (pvar)
56 pvar = new RPvar();
57 pvar->Load(fileIdx, true, true);
58 }
59
60 // if PBED, set missingToMean to TRUE
61 missingToMean = (genoFileType == PBED) ?
62 true : param.missingToMean;
63
64 // Parse PSAM/FAM file of sample identifiers
65 // evaluate subsetting of samples
66 process_samples();
67
68 // Read data file (pgen/bed)
69 pg = new RPgenReader();
70 pg->Load(param.file, pvar, n_samples_psam, sampleIdx1);
71
72 // Initialize vector with capacity to store nVariants
73 // Note, this allocates memory but does not change .size()
74 // After j variants have been inserted, only entries up to j*nsamples are populated
75 // the rest of the vector is allocated doesn't have valid data
76 matDosage.reserve( n_samples() * param.chunkSize );
77 }
78
82 if( vInfo != nullptr) delete vInfo;
83 if( pg != nullptr){
84 pg->Close();
85 delete pg;
86 }
87 if( pvar != nullptr){
88 pvar->Close();
89 delete pvar;
90 }
91 }
92
95 void setRegions(const vector<string> &regions ) override {
96
97 // Initialize genomic regions
98 // from delimited string
99 GenomicRanges gr( regions );
100 // if not empty
101 if( gr.size() != 0){
102
103 // Search is log time for each interval
104 varIdx = vs.getIndeces( gr );
105
106 }else{
107 varIdx.clear();
108 for(int i=0; i<dt.nrows(); i++){
109 varIdx.push_back(i);
110 }
111 }
112
113 // total number of requested variants
114 n_requested_variants = varIdx.size();
115
116 // set current position in varIdx
117 currentIdx = 0;
118 }
119
122 int n_samples() override {
123 return number_of_samples;
124 }
125
128 vector<string> getSampleNames() override {
129 return vInfo->sampleNames;
130 }
131
134 string getStreamType() override {
135 return toString( param.fileType);
136 }
137
139 return vs.getChromRanges();
140 }
141
142 bool getNextChunk( DataChunk<arma::mat> & chunk, const bool &useFilter = true) override {
143
144 // Update matDosage and vInfo for the chunk
145 bool ret = getNextChunk_helper();
146
147 if( useFilter ){
148 // modifies matDosage and vInfo directly
149 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
150 }
151
152 arma::mat M(matDosage.data(), number_of_samples, vInfo->size(), false, true);
153
154 chunk = DataChunk<arma::mat>( M, vInfo );
155
156 return ret;
157 }
158
159 bool getNextChunk( DataChunk<arma::sp_mat> & chunk, const bool &useFilter = true) override {
160
161 // Update matDosage and vInfo for the chunk
162 bool ret = getNextChunk_helper();
163
164 if( useFilter ){
165 // modifies matDosage and vInfo directly
166 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
167 }
168
169 arma::mat M(matDosage.data(), number_of_samples, vInfo->size(), false, true);
170
171 chunk = DataChunk<arma::sp_mat>( arma::sp_mat(M), vInfo );
172
173 return ret;
174 }
175
176 #ifndef DISABLE_EIGEN
177 bool getNextChunk( DataChunk<Eigen::MatrixXd> & chunk, const bool &useFilter = true) override {
178
179 // Update matDosage and vInfo for the chunk
180 bool ret = getNextChunk_helper();
181
182 if( useFilter ){
183 // modifies matDosage and vInfo directly
184 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
185 }
186
187 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), number_of_samples, vInfo->size());
188
189 chunk = DataChunk<Eigen::MatrixXd>( M, vInfo );
190
191 return ret;
192 }
193
194 bool getNextChunk( DataChunk<Eigen::SparseMatrix<double> > & chunk, const bool &useFilter = true) override {
195
196 // Update matDosage and vInfo for the chunk
197 bool ret = getNextChunk_helper();
198
199 if( useFilter ){
200 // modifies matDosage and vInfo directly
201 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
202 }
203
204 Eigen::MatrixXd M = Eigen::Map<Eigen::MatrixXd>(matDosage.data(), number_of_samples, vInfo->size());
205
206 chunk = DataChunk<Eigen::SparseMatrix<double>>( M.sparseView(), vInfo );
207
208 return ret;
209 }
210 #endif
211
212 #ifndef DISABLE_RCPP
213 bool getNextChunk( DataChunk<Rcpp::NumericMatrix> & chunk, const bool &useFilter = true) override {
214
215 // Update matDosage and vInfo for the chunk
216 bool ret = getNextChunk_helper();
217
218 if( useFilter ){
219 // modifies matDosage and vInfo directly
220 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
221 }
222
223 Rcpp::NumericMatrix M(number_of_samples, vInfo->size(), matDosage.data());
224 colnames(M) = Rcpp::wrap( vInfo->getFeatureNames() );
225 rownames(M) = Rcpp::wrap( vInfo->sampleNames );
226
227 chunk = DataChunk<Rcpp::NumericMatrix>( M, vInfo );
228
229 return ret;
230 }
231 #endif
232
233 bool getNextChunk( DataChunk<vector<double> > & chunk, const bool &useFilter = true) override {
234
235 // Update matDosage and vInfo for the chunk
236 bool ret = getNextChunk_helper();
237
238 if( useFilter ){
239 // modifies matDosage and vInfo directly
240 applyVariantFilter(matDosage, vInfo, number_of_samples, getMAF(), getMinVariance() );
241 }
242
243 chunk = DataChunk<vector<double> >( matDosage, vInfo );
244
245 return ret;
246 }
247
248 private:
249 size_t number_of_samples = 0;
250 int n_requested_variants = 0;
251 int currentIdx;
252 vector<double> matDosage;
253 vector<int> varIdx;
254 VariantInfo *vInfo = nullptr;
255 RPgenReader *pg = nullptr;
256 RPvar *pvar = nullptr;
257 DataTable dt;
258 VariantSet vs;
259 string fileIdx;
260 int n_samples_psam;
261 vector<int> sampleIdx1;
262 FileType genoFileType;
263 bool missingToMean;
264
265 bool getNextChunk_helper (){
266
267 // clear data, but keep allocated capacity
268 matDosage.clear();
269 vInfo->clear();
270
271 // number of variants in this chunk
272 int chunkSize = min(param.chunkSize, n_requested_variants - currentIdx);
273 chunkSize = max(chunkSize, 0);
274
275 // if no variants remain, return false
276 if( chunkSize == 0) return false;
277
278 // indeces of variants in chunk
279 auto end = min( varIdx.begin() + currentIdx + chunkSize, varIdx.end());
280 vector<int> varIdx_sub = {varIdx.begin() + currentIdx, end};
281
282 // read dosage into matDosage using
283 // 1-based indeces
284 // vector<int> varIdx_sub1(varIdx_sub);
285 vector<int> varIdx_sub1(varIdx_sub);
286 for(int &i : varIdx_sub1) i++;
287
288 pg->ReadList( matDosage, varIdx_sub1, missingToMean);
289
290 // Populate vInfo from DataTable
291 // Looking up column in DataTable is slow
292 // so do it once and process vector
293 vInfo->addVariants( subset_vector(dt["CHROM"], varIdx_sub),
294 subset_vector(dt["POS"], varIdx_sub),
295 subset_vector(dt["ID"], varIdx_sub),
296 subset_vector(dt["REF"], varIdx_sub),
297 subset_vector(dt["ALT"], varIdx_sub));
298
299 // increment current index
300 currentIdx += varIdx_sub.size();
301
302 return true;
303 }
304
305 // bool getNextChunk_helper(){return getNextChunk_helper2();}
306
307 /* Parse PVAR file of variant positions and IDs
308 */
309 void process_variants(){
310
311 // if file is PGEN
312 if( genoFileType == PGEN ){
313 // Name of .pvar file based on replacing .pgen$
314 fileIdx = regex_replace(param.file, regex("pgen$"), "pvar");
315
316 // Read .pvar file into DataTable
317 // column names are defined by line starting with "#CHROM"
318 // lines before this are ignored
319 dt = DataTable( fileIdx, "#CHROM" );
320
321 // if file is BED
322 }else if( genoFileType == PBED ){
323
324 // Name of .bim file based on replacing .pgen$
325 fileIdx = regex_replace(param.file, regex("bed$"), "bim");
326
327 // Read BIM file with no headerKey
328
329 dt = DataTable( fileIdx );
330 dt.setColNames({"CHROM", "ID", "CM", "POS", "ALT", "REF"});
331 }else{
332 throw logic_error("Not valid genotype file extension: " + param.file);
333 }
334
335 // initialize variant set
336 vs = VariantSet(dt["CHROM"], stoi_vec(dt["POS"]));
337
338 // Set genomic regions regions
339 setRegions( param.regions );
340 }
341
342 void process_samples(){
343
344 DataTable dt2;
345
346 string fileSamples;
347
348 // Get path to samples file
349 // if custom samples file is given
350 if( param.fileSamples.compare("") != 0){
351 fileSamples = param.fileSamples;
352 }else{
353 if( genoFileType == PGEN ){
354 // Name of .psam file based on replacing .pgen$
355 fileSamples = regex_replace(param.file, regex("pgen$"), "psam");
356 }else if( genoFileType == PBED ){
357 // Name of .fam file based on replacing .pgen$
358 fileSamples = regex_replace(param.file, regex("bed$"), "fam");
359 }
360 }
361
362 // Read sample file depending on extension
363 if( regex_search(fileSamples, regex("psam$")) ){
364 // Read .psam file into DataTable
365 // column names are define by line starting with "#IID"
366 // lines before this are ignored
367 dt2 = DataTable(fileSamples, "#IID");
368
369 }else if( regex_search(fileSamples, regex("fam$")) ){
370
371 // Read BIM file with no headerKey
372 // space delimited entries
373 dt2 = DataTable( fileSamples, "");
374
375 // set column names
376 vector<string> names = {"FID", "IID", "PID", "MID", "SEX", "PHENO"};
377 vector<string> names_sub(names.begin(), names.begin() + dt.ncols());
378 dt2.setColNames(names_sub);
379 }else{
380 throw logic_error("Not valid sample file extension: " + fileSamples);
381 }
382
383 // Sample names from PSAM file
384 vector<string> SamplesNames = dt2["IID"];
385 n_samples_psam = SamplesNames.size();
386
387 // indeces of samples to include
388 vector<int> sampleIdx;
389
390 // Filter samples
391 // If param.samples contains entries
392 if( param.samples.compare("-") != 0 ){
393
394 // get sample ids from param.samples
395 vector<string> requestedSamples;
396
397 // split delmited string into vector
398 boost::split(requestedSamples, param.samples, boost::is_any_of("\t,\n"));
399
400 // Use unordered_map linking sample id to index
401 // for fast searching
402 // sn = sampleNames
403 unordered_map<string,int> map_sn;
404 for(int i=0; i<SamplesNames.size(); i++){
405 map_sn.emplace(SamplesNames[i], i);
406 }
407
408 // For each requested sample ID,
409 // get its index in the PSAM file
410 for( string & name : requestedSamples){
411 if( map_sn.count(name) == 0){
412 throw logic_error("Sample id not found: " + name);
413 }
414 sampleIdx.push_back( map_sn[name] );
415 }
416 sort(sampleIdx.begin(), sampleIdx.end());
417
418 // Get the requested sample ID's
419 // sorted according to the PSAM file
420 vector<string> requestedSamples_ordered;
421 for(int i : sampleIdx){
422 requestedSamples_ordered.push_back( SamplesNames[i] );
423 }
424
425 // populate VariantInfo with requested sample IDs
426 // in the order from the PSAM file
427 vInfo = new VariantInfo( requestedSamples_ordered );
428 number_of_samples = requestedSamples_ordered.size();
429
430 // convert to 1-based indeces
431 sampleIdx1.assign(sampleIdx.begin(), sampleIdx.end());
432 for(int &i : sampleIdx1) i++;
433 }else{
434 vInfo = new VariantInfo( SamplesNames );
435 number_of_samples = SamplesNames.size();
436
437 // set sampleIdx1 to be seq(1, number_of_samples)
438 sampleIdx1.resize(number_of_samples);
439 iota(begin(sampleIdx1), end(sampleIdx1), 1);
440 }
441 }
442
443};
444
445}
446
447#endif
Definition GenomicDataStream_virtual.h:34
Definition DataTable.h:28
double getMinVariance()
Definition GenomicDataStream_virtual.h:200
double getMAF() const
Definition GenomicDataStream_virtual.h:206
Param param
Definition GenomicDataStream_virtual.h:254
GenomicDataStream()
Definition GenomicDataStream_virtual.h:168
Definition GenomicRanges.h:25
const int size() const
Definition GenomicRanges.h:61
Definition VariantInfo.h:24
void clear()
Definition VariantInfo.h:137
void addVariants(const vector< string > &chr, const vector< string > &pos, const vector< string > &id, const vector< string > &allele1, const vector< string > &allele2)
Definition VariantInfo.h:53
Definition VariantSet.h:45
vector< string > getSampleNames() override
Definition pgenstream.h:128
bool getNextChunk(DataChunk< Rcpp::NumericMatrix > &chunk, const bool &useFilter=true) override
Definition pgenstream.h:213
bool getNextChunk(DataChunk< vector< double > > &chunk, const bool &useFilter=true) override
Definition pgenstream.h:233
~pgenstream()
Definition pgenstream.h:81
bool getNextChunk(DataChunk< Eigen::SparseMatrix< double > > &chunk, const bool &useFilter=true) override
Definition pgenstream.h:194
string getStreamType() override
Definition pgenstream.h:134
bool getNextChunk(DataChunk< arma::mat > &chunk, const bool &useFilter=true) override
Definition pgenstream.h:142
pgenstream(const Param &param)
Definition pgenstream.h:46
void setRegions(const vector< string > &regions) override
Definition pgenstream.h:95
bool getNextChunk(DataChunk< Eigen::MatrixXd > &chunk, const bool &useFilter=true) override
Definition pgenstream.h:177
GenomicRanges getChromRanges() override
Definition pgenstream.h:138
int n_samples() override
Definition pgenstream.h:122
bool getNextChunk(DataChunk< arma::sp_mat > &chunk, const bool &useFilter=true) override
Definition pgenstream.h:159
pgenstream()
Definition pgenstream.h:42
Definition bgenstream.h:33
FileType
Definition utils.h:232
@ PBED
Definition utils.h:238
@ PGEN
Definition utils.h:237
Definition GenomicDataStream_virtual.h:65
int chunkSize
Definition GenomicDataStream_virtual.h:157