Запись в несколько файлов с использованием многопоточности и ограничение числа записей до 20000 в каждом файле - PullRequest
0 голосов
/ 08 июня 2011

У меня здесь сложная ситуация, ..

Я занимаюсь разработкой многопоточного приложения Существует 5 потоков, которые я создаю и использую эти потоки. Я хочу написать программу для одновременной записи в 5 файлов, а также ограничить количество данных до 20000. I Я получаю данные из БД. Данные разные для всех 5 потоков. Таким образом, правильные данные должны войти в каждый файл. Кроме того, каждому потоку также необходим доступ к правильному порядку файлового редактора, чтобы записать правильные данные в правильный файл и иметь счетчик для каждого файла.

public class AuditMissingRelationshipGuidEntitiesToXml
{   
    private Logger logger = ERDLoggerUtility.getLogger( this.getClass().getName());
    private DbPartitionedConnections myDbPConn = null;
    private RelationshipRepositoryDao myRRDao = null;
    private MrdPartitionedConnections myMRDPartitionConn = null;
    private MasterRecordDao myMRDRDao = null;
    private String myCollectionName = null;
    private String myOutputDir = null;
    private String myOutputFileName = null;
    private EntityFileWriter myEntityFileWriter = null;
    private NovusDocumentGuidCaseFinder novusDocGuidCaseFinder = null;
    //private int entityFileWriterCount = 0;
    private static List <String> collections = new ArrayList<String>(); 
    private static final String DBDRIVER = "oracle.jdbc.driver.OracleDriver";
    private BasicDataSource ds;
    private static final int NTHREADS = 30;

    private void setupDatabases() throws Exception {        
        ErdProperties props = ErdProperties.getProps();     
        this.myDbPConn = new RrdPartitionedConnections();
        this.myRRDao = new OracleRelationshipRepositoryDao(this.myDbPConn);
        this.myMRDPartitionConn = new MrdPartitionedConnections();
        this.myMRDRDao = new OracleMasterRecordDao(this.myMRDPartitionConn);
        ds = new BasicDataSource();
        ds.setDriverClassName(DBDRIVER);
        ds.setUrl(props.getProperty("wfdb.uri"));
        ds.setUsername(props.getProperty("wfdb.user"));
        ds.setPassword(props.getProperty("wfdb.password"));


        novusDocGuidCaseFinder = new NovusDocumentGuidCaseFinder(ds);
        novusDocGuidCaseFinder.setNovusCollection(myCollectionName);
}

private void setCommandLineProperties(String[] theArgs) throws Exception {
        if (theArgs == null || theArgs.length != 3 ||
            theArgs[0] == null || theArgs[0].length() == 0 ||
            theArgs[1] == null || theArgs[1].length() == 0 ||
            theArgs[2] == null || theArgs[2].length() == 0) {

              System.err.println("You must provide 3 arguments! See below.");
              System.err.println("\t1st Argument) -- Output directory location. ex: \"/ct-data/RRDGuids\"");
              System.err.println("\t2nd Argument) -- Output file name. ex: \"rrd_seq_guid\"");
              System.err.println("\t3rd Argument) -- Collection name. ex: \"N_ARREST\"");

              throw new Exception("Invalid Arguments, see console output.");
        } 
        else {
              // Set the output directory location
              this.myOutputDir = theArgs[0];
              System.out.println("setCommandLineProperties() -- 1st Argument) Input directory location = [" + this.myOutputDir + "]");

              // Set the output file name
              this.myOutputFileName = theArgs[1];
              System.out.println("setCommandLineProperties() -- 1st Argument) Input                   file name = [" + this.myOutputFileName + "]");

              // Set the output file name
              //this.myCollectionName = theArgs[2];
              //System.out.println("setCommandLineProperties() -- 3nd Argument)                   Collection name = [" + this.myCollectionName + "]");
    }
}

/**
 * Add the rel seq and entity guid.
 * 
 * @param theRelSeq
 * @param theEntityGuid
 * @throws Exception
 */
private synchronized void addEntity(String theRelSeq, String theEntityGuid, 
        String theRelGuid, EntityFileWriter myEntityFileWriter) throws Exception {

    /*
    if (this.myEntityFileWriter == null) {
        System.out.println("file for  :"+collectionName);
        this.myEntityFileWriter = new EntityFileWriter(new File(myOutputDir), myOutputFileName, collectionName, System.currentTimeMillis());
    }
    */  

    myEntityFileWriter.addEntity(theRelSeq, theEntityGuid, theRelGuid);

}

private Set<String> locateEntityGuidsInMRD(List<String> theRRDGuids, String collectionName) throws Exception {
    Set<String> foundEntityGuids = new HashSet<String>();

    System.out.println("Searching MRD for entity guids (AKA: BASE_GUID)... for : " + collectionName);

    MasterRecordDao myMRDRDao = null;

    synchronized(collectionName.intern()){
        myMRDRDao = new OracleMasterRecordDao(this.myMRDPartitionConn);
        /*for(String li : theRRDGuids){
            System.out.println(li);
        }*/
        List<PersonEntity> personEntities = myMRDRDao.getByEntityGuid(theRRDGuids);
        System.out.println("Found: " + (personEntities == null ? "0" : personEntities.size()) + " entities in MRD for :" + collectionName);

        if (personEntities != null) {
            for (PersonEntity pe : personEntities) {
                foundEntityGuids.add(pe.getEntityGuid());   
            }
        }
        this.notify();
    }

    myMRDRDao=null;
    return foundEntityGuids;
}

public void determineOrphanRelationships(String collectionName) throws Exception {
    Long startTime = null;
    Long endTime = null;

    startTime = System.currentTimeMillis();

    System.out.println("Started on: " + new Timestamp(startTime).toString() + "::" + collectionName);

    synchronized(collectionName.intern()){  
        EntityFileWriter myEntityFileWriter = null;

        Map<EntityDocRelationship, String> entityDocRels = new HashMap<EntityDocRelationship, String>();

        EntityDocRelationshipIterator entityDocRelIter = myRRDao.getByCollection(collectionName);

        int entityFileWriterCount = 0;

        while (entityDocRelIter.hasNext()) {

            /*
             * Reference: OracleRelationshipRepositoryDao.populateFrom()
             * 
             * EntityDocRelationship.getPk() = EntityDocRelationship.REL_SEQ (1st column in prepared statement)
             * EntityDocRelationship.getRelGuid() = EntityDocRelationship.REL_GUID (4th position in prepared statement)
             * EntityDocRelationship.getEntityGuid() = EntityDocRelationship.BASE_GUID (5th position in prepared statement)
             * EntityDocRelationship.getDocGuid() = EntityDocRelationship.TARGET_GUID (6th position in prepared statement)
             */

            if(entityDocRels.size() < 20000) {

                EntityDocRelationship entityDocRel = (EntityDocRelationship)entityDocRelIter.next();

                entityDocRels.put(entityDocRel, entityDocRel.getEntityGuid());

            } 
            else{

                entityFileWriterCount = printNotFoundEntityGuids(entityDocRels, collectionName, 
                        entityFileWriterCount, myEntityFileWriter, false);
            }
            this.wait();

        }


        if(entityDocRels.size() > 0){

            entityFileWriterCount = printNotFoundEntityGuids(entityDocRels, collectionName, 
                    entityFileWriterCount, myEntityFileWriter, true);
            endTime = System.currentTimeMillis();
            entityFileWriterCount = 0;

            System.out.println("Ended on:" + endTime);
            System.out.println("Run time: " + (endTime - startTime));

        }
    }

}


private int printNotFoundEntityGuids(Map<EntityDocRelationship, String> entityDocRels, String collectionName, 
        int entityFileWriterCount, EntityFileWriter myEntityFileWriter, boolean closeWriter) throws Exception{

    // Do work with the distinct set of entity guids.

    Set<String> foundEntityGuids = locateEntityGuidsInMRD(new ArrayList<String>(entityDocRels.values()), collectionName);

    System.out.println(foundEntityGuids.size());

    System.out.println("Printing not found entity guid list...");

    for (Map.Entry<EntityDocRelationship, String> entry : entityDocRels.entrySet()) {
        if (!foundEntityGuids.contains(entry.getValue())) {

            synchronized(this){ 
                if (entityFileWriterCount == 0) {

                    System.out.println("file for  :"+collectionName);
                    myEntityFileWriter = new EntityFileWriter(new File(myOutputDir), myOutputFileName, 
                            collectionName, System.currentTimeMillis());
                }

                entityFileWriterCount++;

                addEntity(String.valueOf(entry.getKey().getPk()), entry.getValue(), entry.getKey().getRelGuid(), myEntityFileWriter);

                System.out.println(collectionName + " :: Relationship pk: " + entry.getKey().getPk() + " is orphan to Entity: " + entry.getValue());

                System.out.println("Finished printing.");

                if(closeWriter || entityFileWriterCount >= 10){

                    System.out.println("Closing file writer...");
                    if (this.myEntityFileWriter != null) {
                        this.myEntityFileWriter.endDocument();
                        this.myEntityFileWriter=null;

                    }
                    System.out.println("File writer closed.");

                }

            }


        }
    }

    return entityFileWriterCount;

}

public void deleteRelationships(final String collectionName) throws Exception {

    SAXParserFactory factory = SAXParserFactory.newInstance();
    SAXParser saxParser = factory.newSAXParser();

    File outputDir = new File(this.myOutputDir);

    FilenameFilter filter = new FilenameFilter() { 
        public boolean accept(File dir, String name) { 
            return name.startsWith("rrd_seq_guid_"+collectionName); 
        } 
    }; 

    String[] files = outputDir.list(filter);

    for (String file : files) {
        logger.info("Getting relationships from file: " + file + "...");

        EntityFileHandler parsedFile = new EntityFileHandler();
        saxParser.parse(this.myOutputDir + file, parsedFile);

        Map<String, String> relSeqsGuids = new HashMap<String, String>();
        for (EntityFileHandler.EntityFile entityFile : parsedFile.getDocuments()){
            relSeqsGuids.put(entityFile.getRelSeq(), entityFile.getRelGuid());
        }

        /*logger.info("Deleting relationship sequence guids...");
        myRRDao.deleteRelSeqGuids(relSeqsGuids.keySet());
        logger.info("Relationship sequence guids deleted.");*/
        /*
        logger.info("Validating relationship guids in Novus...");
        List<String> validNovusRelGuids = novusDocGuidCaseFinder.search(
                relSeqsGuids.values().toArray(new String[relSeqsGuids.values().size()]));
        logger.info("Relationship guids are valid in Novus.");*/

        List<String> validNovusRelGuids = new ArrayList<String>();
        validNovusRelGuids.addAll(relSeqsGuids.values());

        logger.info("Sending delete files to Novus...");
        sendDeleteFileToNovus(file, validNovusRelGuids, collectionName);
        logger.info("Delete files sent.");


    }
}

/**
 * This method will send delete files to Novus based on the list
 * of relationship Guids passed in.
 * 
 * Example delete file:
 * 
 * <deletes>
 *     <collection>
 *         <name>...</name>
 *         <guid></guid>
 *         .
 *         . N times
 *         .
 *         <guid></guid>
 *     </collection>
 * </deletes>
 * 
 * @param theRelGuids - ERD Relationship Guids to send a delete file for.
 * @throws Exception 
 */
public void sendDeleteFileToNovus(String theFile, List<String> theRelGuids, String collectionName) {
    logger.info("Sending delete files to Novus...");
    try{


    Document document = new DocumentImpl();

    // Create the Delete Element (aka: root): <Deletes>
    Element deleteElement = document.createElement("deletes");

    /*
     * Create the Collection Element <Collection> and then add it to the Deletes Element.
     * 
     * <deletes>
     *     <collection></collection>
     * </deletes>
     */
    Element collectionElement = document.createElement("collection");
    deleteElement.appendChild(collectionElement);

    /*
     * Create the Name Element <Name> with the Collection name
     * and then add it to the Collection Element.
     * 
     * <deletes>
     *     <collection>
     *         <name>...</Name>
     *     </collection>
     * </deletes>
     */
    Element nameElement = document.createElementNS(null, "name");
    nameElement.appendChild(document.createTextNode(collectionName));
    collectionElement.appendChild(nameElement);

    if (theRelGuids != null && theRelGuids.size() > 0) {
        /*
         * For each Relationship Guid add a Guid Element to the Collection
         * Element with the Relationship Guid.
         * 
         * <deletes>
         *     <collection>
         *         <name>...</name>
         *         <guid>...</guid>
         *     </collection>
         * </deletes>
         */
        for(String relGuid : theRelGuids){
            Element relGuidElement = document.createElementNS(null, "guid");
            relGuidElement.appendChild(document.createTextNode(relGuid));
            collectionElement.appendChild(relGuidElement);
        }
    }

    logger.info("Add delete element text content: " + deleteElement.getTextContent());
    document.appendChild(deleteElement);

    String deleteFile = 
        File.separator + "ct-data" + 
        File.separator + "erd" + 
        File.separator + "wip" + 
        File.separator + "relAtishDeletes" + 
        File.separator + theFile;

    logger.info("Creating Novus delete file: " + deleteFile + "...");
    FileOutputStream outStream = new FileOutputStream(deleteFile);

    OutputFormat of = new OutputFormat("XML","ISO-8859-1",true);
    of.setIndent(1);
    of.setIndenting(true);

    XMLSerializer serializer = new XMLSerializer(outStream, of);
    serializer.asDOMSerializer();
    serializer.serialize(document.getDocumentElement());

    outStream.close();
    logger.info("Delete file: " + deleteFile + " created.");
    }
    catch(Exception e){
        e.printStackTrace();
    }
    /*MatchManageCommand mmc = new MatchManageCommand();
    String publishXML = mmc.getPublishXml(-1, this.myCollectionName, "-1", theFile);
    mmc.ms.putMessageOnQueue(publishXML, 2);*/
}

public void getCollections()
{
    Connection connection = null;
    Statement statement = null;
    ResultSet resultSet = null;


    try {
        connection = ds.getConnection();

        String query = "select novus_collection from erd_workflow.collection_info " + 
        "where Novus_collection in ('N-SALE', 'N-UCC00', 'N-UCC01', 'N-UCC02', 'N-UCC03') order by novus_collection asc";



        statement = connection.createStatement();

        resultSet = statement.executeQuery(query);

        while (resultSet.next()) {              
            // Store the Novus Collection and associated Product.
            collections.add(resultSet.getString(1));
        }
    }
    catch (SQLException sqle) {
        sqle.printStackTrace();

    } catch(Exception e){
        e.printStackTrace();
    }
    finally {
        if (resultSet != null) {
            try {
                resultSet.close();
                statement.close();
                connection.close();
            } catch (SQLException sqle) {
                sqle.printStackTrace();
            }
        }
    }
}

public static void main(String[] args) throws Exception{
    AuditMissingRelationshipGuidEntitiesToXml audit = new AuditMissingRelationshipGuidEntitiesToXml();

    ExecutorService executor = Executors.newFixedThreadPool(NTHREADS); // makes thread pool 

    try {
        audit.setupDatabases();
        audit.getCollections();
        audit.setCommandLineProperties(args);

        //audit.runThreads(0, collections.size());
        int start = 0; 

        while(start < collections.size()){

            Runnable worker = audit.new RunThreads(audit, collections.get(start) );
            /*
             * Executes the given command at some time in the future.  The command
             * may execute in a new thread, in a pooled thread, or in the calling
             * thread*/ 
            executor.execute(worker); 

            start++; // increment to get next collection 

        }

         /*
         * Initiates an orderly shutdown in which previously submitted
         * tasks are executed, but no new tasks will be
         * accepted. Invocation has no additional effect if already shut
         * down. */
        executor.shutdown();

        System.out.println("Finished all threads");

        //  System.out.println("done with current threads..... creating new threads... ");

    }catch(InterruptedException e) {
         System.out.println("Interrupted :");
         e.printStackTrace();
      }catch(RejectedExecutionException e){
          e.printStackTrace();
      }

}


public String getMyCollectionName() {
    return myCollectionName;
}

public void setMyCollectionName(String myCollectionName) {
    this.myCollectionName = myCollectionName;
}

public class RunThreads implements Runnable{

    AuditMissingRelationshipGuidEntitiesToXml auditThread;
    private String myCollection;
    private int sleepTime; // random sleep time for thread
    private Random generator = new Random();

    public RunThreads(AuditMissingRelationshipGuidEntitiesToXml audit, String collection){

        try{
            this.auditThread = audit;
            myCollection = collection;

            // pick random sleep time between 0 and 5 seconds
            sleepTime = generator.nextInt(20000);

        }catch(Exception e){
            System.out.println("Interrupted");
        }
    }

    /* Synchronize the AuditMissingRelationshipGuidEntitiesToXml object and call the methods on the particular object */

    public void run() {

        synchronized(this.myCollection.intern()) { // synchronized block

            try {
                //Thread.sleep(sleepTime);
                System.out.println("Collection started : "+ this.myCollection);
                this.auditThread.determineOrphanRelationships(this.myCollection);
                this.auditThread.deleteRelationships(this.myCollection);



            } catch (Exception e) {

                e.printStackTrace();
            }

          }

    }

}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...